elodina / go_kafka_client

Apache Kafka Client Library for Go
http://www.elodina.net
Apache License 2.0
275 stars 74 forks source link

Fix zookeeper event watching, subscription, reconnection #159

Closed serejja closed 9 years ago

carnellj commented 9 years ago

Hi Serejja,

First of all thank you. I am still getting a panic when applied. (I will attach the panic to the back end). Lets give you a quick background on where we originally encountered the problem.

  1. We have a 3 node kafka cluster with a corresponding 3 node zookeeper cluster.
  2. Our dev op team was doing a rolling upgrade to the zookeeper cluster.
  3. When a node was being brought back up that the client was connected to, the code crashed with the panic documented in the original email.

Our app is a Go-based Microservice that acts as a proxy to several internal services. We are using Kafka to basically notify our service when some of the data we cache has been invalidated and needs to be evicted.

To reproduce the problem I have been putting our client against a single zookeeper node in our cluster and then taking the zookeeper node up and down. I can repeatedly reproduce the panic everytime I try to restart the zookeeper node in question. Note: The client never goes down while the zookeeper node is down. It happily processes messages and then writes errors that it cant commit the offset.

I have managed to write a workaround. (Please forgive me as it probably crude. I am new to Go and Zookeeper.) Basically my workaround was to add a channel to the ZooKeeperConfig struct in zk_coordinator. Then in the trySubscribeChange, I replace the panic with a push of a value to a channel.

if strings.HasPrefix(e.Path, newZKGroupDirs(this.config.Root, Groupid).ConsumerRegistryDir) { Info(this, "Trying to renew watcher for consumer registry") consumersWatcher, err = this.getConsumersInGroupWatcher(Groupid) if err != nil { this.config.ZKConnectInterrupt <- true } } else if strings.HasPrefix(e.Path, fmt.Sprintf("%s/%s", newZKGroupDirs(this.config.Root, Groupid).ConsumerApiDir, BlueGreenDeploymentAPI)) { Info(this, "Trying to renew watcher for consumer API dir") blueGreenWatcher, err = this.getBlueGreenWatcher(Groupid) if err != nil { this.config.ZKConnectInterrupt <- true } } else if strings.HasPrefix(e.Path, this.rootedPath(brokerTopicsPath)) { Info(this, "Trying to renew watcher for consumer topic dir") topicsWatcher, err = this.getTopicsWatcher() if err != nil { this.config.ZKConnectInterrupt <- true } }

I also do the same thing in the in the listenConnectionEvents:

for groupId, watch := range this.watches { _, err := this.SubscribeForChanges(groupId) if err != nil { this.config.ZKConnectInterrupt<-true } watch <- Reinitialize }

Finally when I create the consumer, I listen on this channel and then if there is a problem I close the consumer and the config.Coordinator.Disconnect()

This closes the old connection and re-initializes the consumer group allowing the application to keep going.

Like, I said I am not sure if the right solution. The challenge have is that when panics are used inside of spun go routines there is no way for the main application to "catch" them. (I could be wrong .... I am new to go).

Anyways, thanks again for responding to this. Appreciate the help and please let me know if there is anything I can supply to you to help debug this problem.

    Thanks,
        John

Here is the top of the panic.

panic: zk: could not connect to a server

goroutine 115 [running]: github.com/stealthly/go_kafka_client.(_ZookeeperCoordinator).trySubscribeForChanges.func1(0xc82013a8a0, 0xc82044c120, 0x749960, 0x33, 0xc8204c0420, 0xc8200320d8, 0xc820032078, 0xc820465880, 0xc820032090, 0xc8200320a8, ...) /Users/johncarnell1/work/edge-proxy/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:547 +0x9e0 created by github.com/stealthly/go_kafka_client.(_ZookeeperCoordinator).trySubscribeForChanges /Users/johncarnell1/work/edge-proxy/src/github.com/stealthly/go_kafka_client/zk_coordinator.go:581 +0xa48

serejja commented 9 years ago

Hi John,

given your steps I can't reproduce this on local environment. I tried all sorts of ZK start/stop patterns (e.g. quickly start/stop ZK; stop, wait 1-5 minutes, start again etc) and all I see is "connection refused" messages in stderr which are normal. No panics at all.

Can you please share your Kafka and ZK versions? (I doubt the problem is in Kafka version but it definitely can be ZK version).

I was testing this against Kafka 0.8.2.1 and Zookeeper 3.4.6

One more thing worth to mention here: IMO it makes sense to panic if the coordinator can't refresh watchers after some retries because this way it could miss Zookeeper notifications that should trigger rebalance and so the whole cluster could fail to rebalance. In your case I'd say you could increase the ZookeeperConfig.ZookeeperTimeout and ZookeeperConfig.MaxRequestRetries values so that coordinator tries harder before it panics. In general this panic looks like a desired behavior that prevents your cluster from inconsistent state.

carnellj commented 9 years ago

Hi Serhy,

Appreciate the efforts. Here is the info you request.

kafka = 0.8.2.1 zookeeper = 3.4.5

Were you running messages through the platform when you shutdown a zookeeper node or did you start the application and have a "quiet" environment. My application was running traffic through when I shutdown the zookeeper node. Also, I was running in a multinode environment, but for testing purposes only hook up a single to a single zookeeper node.

I would agree with you about the Panics, however the one thing that I don't like about Panics especially in asynchronous application is they take away the ability to intercept a problem and work around it. Our case, it crashes our server with little opportunity to respond. I do like the strategy approach you guys laid out and I am wondering if instead of calling a Panic, you invoke a Strategy that can be overridden by a developer if they need a to be notified.

I will take a look a the ZookeeperConfig and ZookeeperConfig.MaxRequestRetries. I will take a look in the code and see what the default values are for these.

Thanks again for the help, John Carnell

On Thu, Sep 17, 2015 at 5:01 AM, Serhey Novachenko <notifications@github.com

wrote:

Hi John,

given your steps I can't reproduce this on local environment. I tried all sorts of ZK start/stop patterns (e.g. quickly start/stop ZK; stop, wait 1-5 minutes, start again etc) and all I see is "connection refused" messages in stderr which are normal. No panics at all.

Can you please share your Kafka and ZK versions? (I doubt the problem is in Kafka version but it definitely can be ZK version).

I was testing this against Kafka 0.8.2.1 and Zookeeper 3.4.6

One more thing worth to mention here: IMO it makes sense to panic if the coordinator can't refresh watchers after some retries because this way it could miss Zookeeper notifications that should trigger rebalance and so the whole cluster could fail to rebalance. In your case I'd say you could increase the ZookeeperConfig.ZookeeperTimeout and ZookeeperConfig.MaxRequestRetries values so that coordinator tries harder before it panics. In general this panic looks like a desired behavior that prevents your cluster from inconsistent state.

— Reply to this email directly or view it on GitHub https://github.com/stealthly/go_kafka_client/pull/159#issuecomment-141016865 .

serejja commented 9 years ago

Hey John,

I've tried both quiet cluster and running traffic (approx. 5k dummy msg/sec) and still can't reproduce that. Not sure what I'm doing wrong :)

Regarding intercepting panics I think we could add something like a PanicHandler (or whatever would we call it) to the ConsumerConfig with the default implementation that would panic as it is now. This way we wouldn't break any existing clients AND would give the ability to handle abnormal situations like this more or less gracefully.

I think I'll create a separate PR with this behavior and let you know.

Thanks!

serejja commented 9 years ago

Uhm, and yes, could you please try testing this locally with ZK 3.4.6? Maybe the issue is hidden somewhere there?

serejja commented 9 years ago

Hey John, can you please take a look at #162 and reply what you think about this approach?

Thanks!

carnellj commented 9 years ago

Hi Serhey,

Thanks for the effort. Part of the challenge with asynchronous solutions is tracking down when things go bump in the night.

I really like your PanicHandler solution. It would fit nicely because what I do today is modified the ZookeeperConfig to accept it a channel that I listen to and then cleanly shutdown the consumer and disconnect the zookeeper connection. Ideally my goal is to detect that there is a problem with the consumer and either shut it down (without killing the entire service) or attempt to restart the consumer and let it get to a working Zookeeper node.

Thanks again and please let me know when the Pull Request is merged and I will pull down the changes and try them out.

  Thanks,
        John

On Sep 18, 2015 3:28 AM, "Serhey Novachenko" notifications@github.com wrote:

Hey John,

I've tried both quiet cluster and running traffic (approx. 5k dummy msg/sec) and still can't reproduce that. Not sure what I'm doing wrong :)

Regarding intercepting panics I think we could add something like a PanicHandler (or whatever would we call it) to the ConsumerConfig with the default implementation that would panic as it is now. This way we wouldn't break any existing clients AND would give the ability to handle abnormal situations like this more or less gracefully.

I think I'll create a separate PR with this behavior and let you know.

Thanks!

— Reply to this email directly or view it on GitHub https://github.com/stealthly/go_kafka_client/pull/159#issuecomment-141380467 .

carnellj commented 9 years ago

Hi Serhey,

Never mind I see you have already committed the changes. I will pull them down and look at it. Have a good weekend.

Thanks, John

On Fri, Sep 18, 2015 at 4:28 AM, Serhey Novachenko <notifications@github.com

wrote:

Hey John,

I've tried both quiet cluster and running traffic (approx. 5k dummy msg/sec) and still can't reproduce that. Not sure what I'm doing wrong :)

Regarding intercepting panics I think we could add something like a PanicHandler (or whatever would we call it) to the ConsumerConfig with the default implementation that would panic as it is now. This way we wouldn't break any existing clients AND would give the ability to handle abnormal situations like this more or less gracefully.

I think I'll create a separate PR with this behavior and let you know.

Thanks!

— Reply to this email directly or view it on GitHub https://github.com/stealthly/go_kafka_client/pull/159#issuecomment-141380467 .