elodina / go_kafka_client

Apache Kafka Client Library for Go
http://www.elodina.net
Apache License 2.0
275 stars 74 forks source link

consumer.Close() never finishes and always panics #140

Open davidzeng opened 9 years ago

davidzeng commented 9 years ago

I'm trying to Close() the consumer under circumstances, but whenever I call the Close() function, the worker manager never seems to finish.

13:39:52.1 | 2015-07-23/13:39:52 [INFO] [ConsumerFetcherRoutine-TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68-0] Closing fetcher
13:39:52.1 | 2015-07-23/13:39:52 [INFO] [ConsumerFetcherRoutine-TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68-0] Stopped fetcher
13:39:52.1 | 2015-07-23/13:39:52 [INFO] [TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68-manager] Successfully closed all fetcher manager routines
13:39:52.1 | 2015-07-23/13:39:52 [INFO] [TTV-0582.local:e989b4a8-8365-45f3-e3b1-eaf942271e68] Stopping worker manager...
13:44:52.1 | panic: Graceful shutdown failed

Are there pre-requisites that need to be fulfilled before calling the function? I've tried upping the timeout to 5 minutes, but it still does nothing.

davidzeng commented 9 years ago

I did a little bit more digging and it seems like my workerManagers aren't closing due to the lock from start batch. My callback for WorkerFailedAttemptCallback and WorkerFailureCallback are both doNotCommitAndStop.

I only have the 10 default workers and 14 partitions running (14 worker managers). It seems like what's happening is that the worker stops, but some partitions are running startBatch, which waits on an available worker, but since my callback stops the worker, there are no workers and the call blocks indefinitely. I bumped up the number of workers to 30 in an attempt to free up more workers, but it seems like that's not enough. I'm not sure if adding more workers is the solution here. Why doesn't the Close() call send a kill signal to the workers?

davidzeng commented 9 years ago

Another alternative is to have startBatch only block for a certain amount of time and then quit. However, I can see how that wouldn't be the best solution.

baconalot commented 9 years ago

I presume you want to close the consumer from the messagehandler. I also got these problems, and my fix was to make a wrapper around the library; put a message processed item in a channel from the messagehandler if you want to continue processing; and in new mainloop stop if no message processed have been recieved on that channel after x seconds.

Not really nice, but for the most part it got rid of the fatals for me.

I agree this should be handled more graceful.

davidzeng commented 9 years ago

I'm calling Close() from the consumer that comes out of kafka.NewConsumer(kafkaConfig). @baconalot What's the messagehanlder you're talking about?

teou commented 9 years ago

it's a dead lock when startBatch and Close all try to hold the stopLock in worker manager and at the same time Close's managerStop channel is waiting on startBatch to finish and select data from the managerStop channel.

I don't see why the wm.stopLock is necessary here.