Jroland / kafka-net

Native C# client for Kafka queue servers.
Apache License 2.0
482 stars 232 forks source link

Consumer hangs and no message is received #74

Open avoxm opened 9 years ago

avoxm commented 9 years ago

Hi,

I am implementing a simple consumer using the sample code on project page and it looks like there's an issue there. The code is being blocked by consumer.Consume() and it stays in that state "forever". In parallel I run a console consumer and a python version of the consumer. Once a new message is publish both and console and python consumers work fine and I can see the message, but kafka-net does not react in any way.

Here is my code :

        var options = new KafkaOptions(new Uri("http://test:9092"));
        var brokerRouter = new BrokerRouter(options);
        var consumerOptions = new ConsumerOptions(topic, brokerRouter);
        var consumer = new Consumer(consumerOptions);

        //Consume returns a blocking IEnumerable (ie: never ending stream)
        foreach (var message in consumer.Consume())
        {
            Console.WriteLine("Response: P{0},O{1} : {2}",
                message.Meta.PartitionId, message.Meta.Offset, message.Value);
        }
bmittenzwey commented 9 years ago

I'm seeing a similar issue. It seems to happen when a Kafka queue has dropped old data, but I'm trying to read from the beginning. (i.e. index 0 on each partition)

The problem looks like it is coming from the ConsumeTopicPartitionAsync method on line 157.

if(response != null && response.message.count > 0)
{
     HandleResponseErrors(fetch, response);

because of the check for message count > 0, the Index out of range exception will never be raised and the current index for this partition will remain at 0, trying over and over and over and over

To get it to behave the same as the example java consumer, I had to make two small changes: 1) remove the && response.message.count > 0 section of line 157 2) In the signature of GetTopicOffsetAsync, I changed the default maxOffsets from 2 to Int32.MaxValue This second one makes sure you will get the smallest available offset when you are trying to run from the beginning.

You can also avoid adjusting the consumer class if you just call GetTopicOffsetAsync with a large value for maxOffsets in your program, then call consumer.SetOffsetPosision(offsets); with an offset list that you fill with the Min offsets returned by GetTopicOffsetAsync

aabs commented 9 years ago

I'm experiencing delays in the consumer that seem to be at least partially related to retrieval of topic metadata in the BrokerRouter during the call to Consume. I posted a question on SO here. Not sure if the issues are related...

avoxm commented 9 years ago

Nothing really helped .. the consumer is still not pulling any messages.It is quite confusing since it works with kafka4net.

stephendrew commented 8 years ago

I am also witnessing this with a very simple .NET consumer and 4 .NET producers on one topic. At a particular point, the consumer hangs even though the producers are producing. It will always hang at the same place if restarted.

ah- commented 8 years ago

I'm working on a fully featured Kafka client for .net, which should not have that issue: https://github.com/ah-/rdkafka-dotnet

stephendrew commented 8 years ago

Very interesting Andreas, thanks. I will take a closer look. Let me know if I can help out in anyway, I enjoy porting C++ code to C# :)

avoxm commented 8 years ago

@ah- looks interesting... Do you have a roadmap ? I have started working on my own version would be happy to combine efforts into one singe lib.

ah- commented 8 years ago

@stephendrew @avoxm That sounds fantastic, contributions are more than welcome. Did you look at building something based on librdkafka yourself or were you planning to implement everything in C#?

As for the roadmap, it's already fairly complete thanks to most of the hard work being done by librdkafka. I'm currently waiting on the next release of librdkafka to do my first proper release, which should happen within the next couple of weeks.

I'd love for someone to review the API and interfaces, so if you have some time to have a look at that I would really appreciate it! I'm aiming for something close to the new Java API while being relatively idiomatic C#.

It could also use some more testing, and there's a couple of features that I would like to implement like well integrated Kerberos/Auth and SSL support. Further down the line would be integrated serialisation instead of returning byte[] and niceties like that.

kavyashivakumar commented 7 years ago

hi is kerberos supported now for windows?