ntent / kafka4net

C# client for Kafka
Apache License 2.0
52 stars 32 forks source link

Consumer flow control is broken #37

Closed cbenien closed 7 years ago

cbenien commented 8 years ago

Hi,

I've been debugging a problem for the last week where my consuming application would crash with an out-of-memory exception. What I observed:

I think I figured out the root cause of the problem. In the Consumer class, there is a BehaviorSubject

readonly BehaviorSubject<int> _flowControlInput = new BehaviorSubject<int>(1);

which is controlling the flow control. I think what goes wrong is that this subject is written to simultaneously from multiple threads. Once from the thread that increments the counter for new messages:

onMessage = onMessage.Do(msg =>
{
    var count = Interlocked.Increment(ref _outstandingMessageProcessingCount);
    _flowControlInput.OnNext(count);
});

And once from the thread that acknowledges the messages (called from my application)

public void Ack(int messageCount = 1)
{
    if(!Configuration.UseFlowControl)
        throw new InvalidOperationException("UseFlowControl is OFF, Ack is not allowed");

    var count = Interlocked.Add(ref _outstandingMessageProcessingCount, - messageCount);
    _flowControlInput.OnNext(count);
}

I'm not an expert on threading and concurrency with reactive extensions, but calling OnNext simultaneously from multiple threads is a bad idea, e.g. discussed here: http://stackoverflow.com/questions/14396449/why-are-subjects-not-recommended-in-net-reactive-extensions

I could fix this pretty quickly by putting a lock statement around both calls to OnNext. But I'm not sure if that is the best possible solution to the problem. If anyone's interested, I can put together a pull request for this.

Or am I doing something wrong in my application? Am I expected to call Ack on the original thread? Would that solve it?

vchekan commented 8 years ago

@cbenien thank you for this report and the patch

OnNext simultaneously from multiple threads is a bad idea

This definitely looks like a bug. I will try to find time this week to take a better look and will do something about it.

thunderstumpges commented 7 years ago

reviewed the fix by @cbenien and it looks reasonable. As no one here has the bandwidth to think on any other "more preferred" way to do it, I made the decision to merge the pull request and publish v2.0.1 along with a couple other issues.