Jroland / kafka-net

Native C# client for Kafka queue servers.
Apache License 2.0
483 stars 232 forks source link

Trying to consume from one topic and produce to another hangs #59

Open warrenfalk opened 9 years ago

warrenfalk commented 9 years ago

It appears to be impossible to use a BrokerRouter for producing if it is already being used for consuming on another topic. The following code illustrates the problem I'm seeing.

Is this invalid use of a BrokerRouter?

using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;

namespace kafka_net_test
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new KafkaOptions(new Uri[] { new Uri("kafka://localcluster:9092"), });
            var brokerRouter = new BrokerRouter(options);

            long offset = 0;
            var producer = new Producer(brokerRouter, 1);
            producer.BatchSize = 1;
            producer.BatchDelayTime = TimeSpan.FromDays(1);

            {
                // Prime topic with a message if necessary
                //var task = producer.SendMessageAsync("Test", new Message[] { new Message("Test Message") });
                //task.Wait();
                //offset = task.Result[0].Offset;
            }

            // Start a consumer to get that message
            var consumerOptions = new ConsumerOptions("Test", brokerRouter);
            consumerOptions.MinimumBytes = 1;
            consumerOptions.BackoffInterval = TimeSpan.Zero;
            consumerOptions.MaxWaitTimeForMinimumBytes = TimeSpan.MaxValue;
            var consumer = new Consumer(consumerOptions);
            consumer.SetOffsetPosition(new OffsetPosition { PartitionId = 0, Offset = offset });
            var messageStream = consumer.Consume();

            foreach (var message in messageStream)
            {
                // Send message to another topic
                var task = producer.SendMessageAsync("Test2", new Message[] { new Message { Value = message.Value } });
                // ------------------------------------
                // If batch size is 1:
                // Blocks above and never gets here (until eventual timeout)
                // Message never shows up in topic
                // ------------------------------------
                task.Wait();
                // ------------------------------------
                // If batch size is greater than 1:
                // Blocks above and never gets here (until eventual timeout)
                // Message never shows up in topic
                // ------------------------------------
            }
        }
    }
}
warrenfalk commented 9 years ago

Nevermind. It appears that it is actually Kafka that is failing to send the response to the metadata request after the fetch request.

warrenfalk commented 9 years ago

OK, after more investigation, there is actually a problem with kafka-net. The problem is that a kafka broker (in 0.8.2.1 at least) will not process a produce request while a fetch request is waiting for data. This is a problem with kafka-net because as soon as it yields the results of one fetch request, it sends a new fetch request immediately in another thread. This ultimately makes it impossible to use the BrokerRouter to do any publishes that end up being sent to the same broker.

Note: I think this is really poor behavior by Kafka. Its network protocol is described as though it could work this way (with correlation ids and all) but it does not seem to work that way.

warrenfalk commented 9 years ago

In fact, they document that it does not work this way because all requests on a particular socket are ordered:

"The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted."

warrenfalk commented 9 years ago

See discussion here: http://www.mail-archive.com/dev@kafka.apache.org/msg27430.html

This reply suggests that Consumers and Producers should actually not reuse the connection used by the BrokerRouter at all, and each instance should create its own connection for fetch/produce requests. The claim by the protocol wiki page that you should only ever need one connection between each broker and a client is no longer true and is going to be removed. The new Java consumers will all maintain their own connections.

Jroland commented 9 years ago

Thanks warren, ill have a close read of that. Got pretty sick the last several weeks. So I am just getting back into things now.