ntent / kafka4net

C# client for Kafka
Apache License 2.0
52 stars 32 forks source link

How should I handle the situation when Kafka starts re-balancing data on brokers. #33

Closed tigerinus closed 8 years ago

tigerinus commented 8 years ago

If my code start pushing 1 million records to Kafka as a job, and before that job ends if Kafka starts re-balancing data on its brokers (3 of them), then the job stops pushing the rest of the records for some reason.

Question - When Kafka re-balances the data, how should my code handle the situation? Or it's should be transparent that my code doesn't need to do anything?

vchekan commented 8 years ago

The driver should handle rebalance internally. There is unit test for rebalancing while consumer is running, but not for producer. https://github.com/ntent-ad/kafka4net/blob/master/tests/RecoveryTest.cs#L658

First, you need to enable logging https://github.com/ntent-ad/kafka4net/wiki/Troubleshooting#logger and see if driver reports something suspicious when this happen.

If you could reproduce your case in unit test, that would be great help and I'll be able to fix it much quicker.

tigerinus commented 8 years ago

OK. Let me see if I can collect anything useful from logging and get back to this thread.

tigerinus commented 8 years ago

I've got the logs following the troubleshooting instructions. It said a number of events are missed due to bandwidth of something which I have no clue how to solve. During the event capturing my code uses almost 100% of CPU. The producer stops pushing any extra records after pushed 1.9mil out of 25mil records to Kafka - Previously it was able to push ~15mil without enabling the logging.

I am not sure how useful is the rest of the logs captured. I have no idea how to map the "Time mSec" column to the real timestamp, so I can't really pinpoint anything at the time when producer stopped pushing to Kafka.

What exactly should I be looking for?

vchekan commented 8 years ago

You probably did tracing. It should be enough to do logging at this stage, meaning kafka4net.Logger.SetupLog4Net() or kafka4net.Logger.SetupNLog() and taking a look at log file produced.