oleksiyk / kafka

Apache Kafka 0.9 client for Node
MIT License
297 stars 84 forks source link

Connection time limit #73

Open aheckmann-pebble opened 8 years ago

aheckmann-pebble commented 8 years ago

When a misconfigured connection string is pushed to production, I'd like to know about it as soon as I can. After reaching some configurable time threshold, I'd like this driver to reject attempts to connect to Kafka. Currently this driver will keep retrying forever, silently swallowing connection failure.

Thoughts?

oleksiyk commented 8 years ago

I'm streaming all logs to Elastic and then analyse them for failures. This way I'm immediately aware of any problems but I'm also free to not bother restarting services once issue is resolved.

aheckmann-pebble commented 8 years ago

That may work ok but depends on latency between app and logging services. Ideally we would know after 10-15 seconds that we couldn't connect so we can immediately stop a bad configuration rolling out to all production services.

sahilthapar commented 8 years ago

:+1: For a connection retry limit.

memelet commented 8 years ago

We too are streaming all logs to ES. But really the application needs to know what is going on as well. We need to stop consuming from the source when we can't write to kafka. We don't want our ops people having to intervene via log alerting. It is so much better if the application can be itself resilient. As for restarting, all our systems run in marathon/mesos, so in our case restarting is not a manual process.

Any kind of callback when the broker cannot be contacted would immensely helpful.

oleksiyk commented 8 years ago

I'm not sure no-kafka will do this. There are so many possible scenarios to handle that if I add an immediate callback for failed broker I will then receive new issues requesting new options for this callback. Such that the callback should fire only on 3rd failed request, or wait 5 seconds in the hope that Kafka server will re-balance and remove failed broker and so on.

memelet commented 8 years ago

But the callback would be what would allow for that kind of extensibility. With a simple callback all these strategies could be implemented, or not. I'm not asking for the callback to change any behavior of nokafka, nor even be synchronous. Just a notification that the broker is disconnected so the application can take appropriate action.

oleksiyk commented 8 years ago

When the broker is down it is up to the Kafka cluster to take action and reassign partitions to new broker/leader. And then you'll probably need a callback that the broker is up again? What if that broker was removed and replaced permanently?

ismriv commented 8 years ago

@oleksiyk that's actually what the Cassandra driver offers: http://docs.datastax.com/en/latest-nodejs-driver-api/Client.html#event:hostAdd

Most use cases won't need this, but if this isn't extremely difficult to implement .. it'll open up opportunities for apps to deal with it the way they want.

memelet commented 8 years ago

Here is scenario that just happened to us:

A client could not connect to the broker for many hours

Metadata request failed: NoKafkaConnectionError [kafka.service.consul:9092]: Connection was aborted before connection was established.
{ [NoKafkaConnectionError: Connection was aborted before connection was established.]
  name: 'NoKafkaConnectionError',
  server: 'kafka.service.consul:9092',
  message: 'Connection was aborted before connection was established.' } 

This was logged and routed to ELK and alerted on. But the broker was up, and the DNS 'kafka.service.consul' resolved the correct IP. Yet nokafka could not connect to the broker. After restarting the service nokafka connected to the broker and all was well.

But this required manual intervention. Ideally the service using nokafka would determine that it has not been able to the broker for too long and terminate itself (and let Marathon restart it).

This is the kind of behavior that an status callback would enable. (And the callback would have all the logic, nokafka would simply need to notify on state.)

Of course the alternative is to intercept the log messages and parse them (and hope they do not change in a later release).

oleksiyk commented 8 years ago

The error Connection was aborted before connection was established. can only be raised if the no-kafka client was manually closed, e.g. .end() called on the instance too fast.

aheckmann-pebble commented 8 years ago

Emitting an event when unable to connect after a configurable n attempts sounds like it could meet our needs. FWIW, this general problem space has been solved before within the NodeJS mongodb driver as well (connection timeouts, cluster changes, reconnection events, etc). Might be a good place to go for guidance since all their drivers support these behaviors.

zer0Id0l commented 8 years ago

Is there any fix available for "connection timeout limit" ? Apart from intercepting the logs and taking necessary action (as stated above)

koresar commented 8 years ago

Today we hit this problem as well.

Potentially, this can lead to memory leaking or data loss, or both.

We call producer.send() many times per second. It accumulates all the promises in the RAM until the connection is back. At some point the node.js process will be killed with OutOfMemoryException or something, loosing all the in memory data.

Also, this means the producer {retries} option make no sense when a connectivity breaks suddenly. It never gets to retry the producer.send() because it stuck in the endless client.updateMetadata() function.

Proposal

How about a new option which would limit the connection retries? But by default attempt to reconnect forever.

var producer = new Kafka.Producer({
    reconnectionDelay: {retries: 60, min: 1000, max: 1000}, // reject after ~1 minute
});

var producer = new Kafka.Producer({
    reconnectionDelay: {retries: 0, min: 1000, max: 1000}, // defaults. Never rejects
});
koresar commented 8 years ago

Put up the PR to implement rejections. See #131 please.

orgoldfus commented 7 years ago

I don't know if it's still relevant to anyone, but since no-kafka uses bluebird promises, you can use the timeout function, like this:

producer.init().timeout(3000).catch(console.log("Unable to connect"))