confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
7 stars 0 forks source link

Producer backpressure semantic is not async-friendly #476

Closed vchekan closed 5 years ago

vchekan commented 6 years ago

Description

I am thinking how to implement producer correctly in high load async environment. Producer has 2 main modes, callback based (IDeliveryHandler) and Task based. Both have blockIfQueueFull parameter.

Lets start with task mode. If I do the following:

foreach (var n in batch) {
    var message = await producer.ProduceAsync(topic, null, n, true).
        ContinueWith(t => Interlocked.Increment(ref counter));
}

then I effectively produce one message per batch, which is no go. So, I need to accumulate messages and await on batch, which leads to something like this:

var stream = Enumerable.Range(1, batchSize).Select(i => i.ToString());
var tasks = stream.Select(async msg => await producer.ProduceAsync(topic, null, msg, true).
       ContinueWith(t => Interlocked.Increment(ref counter)));
await Task.WhenAll(tasks);

But this is problematic too. If I have an infinite stream of source messages, then I do not know when my "tasks" will end (never), so i'll have to do linq batching.

My another worry is that produceAsync will block, but I do not quite understand how this will interplay with await portion of await producer.ProduceAsync. Are there any chances that Task scheduler will see blocking task, and will schedule new and new threads because from Task Scheduler perspective it is just long running task and it will try to schedule as many parrallel tasks as possible without being aware that this blocked task is actually a backpressure? Would it lead to task exhaust situation?

Bottom line is, i do not see, how use Task based API because we need to await for 2 different type of async flows: backpressuire and task delivery. And sync and async do not mix well. I have intuition that librdkafka blocking call should be converted to a task which can be awaited for. Does this sound reasonable?

Let's consider delivery handler option. It is more appealing to me beacuse generating a task per message and then awaiting for every one of them sounds sub-optimal to me. Start with this implementation:

        class DeliveryHandler : IDeliveryHandler<Null, String>
        {
            public int counter = 0;
            public bool MarshalData => true;

            public void HandleDeliveryReport(Message<Null, string> message)
            {
                Interlocked.Increment(ref counter);
            }
        }

........
            var deliveryHandler = new DeliveryHandler();
            Task.Run(() =>
            {
                using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
                {
                    var stream = Enumerable.Range(1, batchSize).Select(i => i.ToString());
                    foreach (var msg in stream)
                    {
                        producer.ProduceAsync(topic, null, msg, true, deliveryHandler);
                    }
                }
            }).Wait();

Now producing is synchrounous and batch overflow blocking call is propagated to the caller naturally. But there are problems: if you are operating in async-heavy environment, you still want buffer overflow to be exposed as awaitable entity, so you have to make more work to convert it to async. And second problem is, that now I have no way of knowing about tasks failure. How do I handle failed tasks in Delivery Handler?

Checklist

Please provide the following information:

mhowlett commented 6 years ago

thanks @vchekan for your well thought through question and suggestions. I want to think this through carefully and provide a good response (I likely won't get to this immediately). we need to make sure we have all the detail right here before we go to v1 (see #478).

robinreeves commented 6 years ago

I think the way this will behave if you are pushing messages faster than they can be sent, you will create Tasks until you run out of memory. However, I would think the Tasks at the tail would block until there is space in the queue, and then complete up until the time the Stack overflows. This doesn't seem like a practical problem given the code you present. If you are loading an infinite stream, you'd never want to await it because all of the Tasks would never end by definition, which is why you implemented a batch size in the first place. Ultimately, if you are producing messages faster than they can be sent, you'll need to do something to respond to it. You could also go the route of not blocking and have retry logic around errors, which you'd probably need anyway. You can even catch the specific "queue full" error and change the state of your app so it is aware of the situation and can throttle sending. Also, the delivery handler is isn't any different than the Task approach in terms of determining errors. The Task itself won't throw an Exception on error, you'd need to interrogate the Message class in either scenario. The only difference is with Tasks you can await a specific set of Tasks so you know they are all completed. I think in most truly async scenarios this isn't required.

I'm not sure what you are trying to accomplish, but I'm moving messages from one async messaging paradigm (Service Broker) to Kafka and I approach it a bit differently. Since everything is by definition async, I don't need to wait for operations to complete, but rather handle the delivery report and mark messages as acknowledged or errored. I pull a batch of messages from my source and write them this way:

private void ProduceKafkaMessage(Producer<long, string> producer, TransmissionQueueMessage message, TransmissionQueueDeliveryHandler deliveryHandler)
{
    producer.ProduceAsync(message.KafkaTopicName, message.TransmissionQueueId, message.Message, deliveryHandler);
}

I do a little bit of other work in this function, but it isn't relevant here. Basically, I get a batch from my datasource and loop over it calling this function to send the batch to Kafka. The delivery handler looks like this:

public void HandleDeliveryReport(Message<long, string> message)
        {
            try
            {
                if (!message.Error.HasError)
                {
                    SetMessageReceiveAknowledged(message.Key, message.Topic, message.Partition, message.Offset.Value, message.Timestamp.UtcDateTime);
                }
                else
                {
                    MessageFailedReceive(message.Key, (int)message.Error.Code, message.Error.IsBrokerError, message.Error.IsLocalError, message.Error.Reason);
                }
            }
            catch (Exception ex)
            {
                string errorMessage = string.Format("Failed to process result from message with TransmissionQueueId: {0}", message.Key);
                Logger.LogException(errorMessage, ex);
            }
        }

Once a message has been processed, it can be purged, or if there is an error it can be resent. I could have the same problem with back pressure if messages are being generated too fast, but I haven't even gotten close to hitting the queue size limit.

It is also worth noting, when I developed this I was using Tasks and .ContinueWith(), and this is actually what is currently in production, because I wasn't aware of the delivery handler option.

robinreeves commented 6 years ago

I have my own questions about how the producer works. This doesn't seem like the proper place to post such questions, this seems more like a bug reporting system, but I haven't found any message boards for this topic. If there is such a place, please point me to it, but since what I'm asking is somewhat related to this issue, I'm going to post here.

I'm noticing the bottleneck in my process seems to be the callback from the ProduceAsync() method. When I'm sending at a high rate, the number of acks pending continues to grow. The issue could certainly be our Kafka instance, and I'm not in charge of that system, but I also notice some interesting patterns that appear to me could be some kind of throttling in the API. If the process is stopped and I've built up a backlog, once I start it the number of unacknowledged messages being to climb. I'm only talking about 10's of thousands at most, so nothing obscenely huge, but then after a few minutes it seems to level off and then drop down to around 1,200 unacknowledged messages and stay there. I sent about 2.5 million messages a few days ago and saw this pattern. After the spike at the beginning, the unacknowledged messages sat around 1,200 for 8-10 hours. I can think of quite a few reasons this might be, especially since I'm going through two layers, C# and C, before I even get to Kafka. I didn't see any throttling code in the C# wrapper, but I assume something in the C layer could be doing this. There weren't anywhere near enough messages to cause the ProduceAsync() call to block, considering I'd had at least 10 times the number of messages in process at the beginning. Maybe Kafka spun up some extra threads to handle the increased volume, which is why it leveled off, but I haven't gone that deep yet. I thought the code in my handler might be the issue, but I changed it to increment an in-memory counter and then write a file after a large batch of messages had been acknowledged, which should be lightweight. It was 60-90 seconds between the last call to ProduceAsync() and the final ack. Is this latency expected? My Kafka instance is in the cloud, so network latency could be an issue. I assume the latency could be messages in the local queue taking time to actually send to Kafka, and not the time to ack. Is there an easy way to determine what is going on with local queueing? I'm collecting the stats, but I'm not sure what in there would help me diagnose this.

edenhill commented 6 years ago

@robinreeves What I think you are seeing is a problem with small batches, resulting in large per-message overhead and lower throughput rate. This is mostly fixed in librdkafka v0.11.4 (there will be a corresponding .NET release soon, RC today).

The new code properly takes network backpressure into consideration when accumulating batches, increasing the number of messages per batch and thus reducing the overhead, leading to higher throughput.

robinreeves commented 6 years ago

Thanks @edenhill. That seems like a likely cause. I noticed the configuration value queue.buffering.max.ms defaults to 0. Does this mean there is no waiting to batch messages, and could I improve performance in the current version by increasing this value, or am I sending fast enough that there is a backlog being batched incorrectly?

edenhill commented 6 years ago

Yes, increasing that value will improve batching

robinreeves commented 6 years ago

Upping the queue.buffering.max.ms has improved performance, and since a few seconds latency isn't a big issue for me it is a viable solution. The configuration property batch.num.messages defaults to 10,000, and I'm not producing messages fast enough to hit that limit, so I don't think messing with that will improve performance. I assume compressing the message sets might help here, right?

As an aside, I noticed there is a callback for throttling in librdkafka. Is this something that could be exposed in the C# wrapper as an EventHandler like stats, error, or log? I didn't dig into the C code to see if the callback provides anything useful, but it was sitting right next to the other exposed callbacks in the documentation so I thought it could be something of interest.

edenhill commented 6 years ago

The throttle callback is emitted each time the broker throttles a request.

vchekan commented 6 years ago

@robinreeves thanks for your write up. I am performing general evaluation of confluent client and scenario I have in mind is streaming, i.e. when we have chain of computations connected with intermediate topics (like Kafka Streams). If output stream is slower than input (for example, input topic has much more partitions than output), then we will read faster then write and buffer overflow will happen in matter of minutes. Imagine situation when big datacenter application needs to recalculate week worth of data or attempt to restart application after an hour of downtime and you are looking at sleepless night and cool cocktail party story :)

A pattern which address this problem can be seen in .net Dataflow Blocks, namely BatchBlock: https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.batchblock-1?view=netcore-2.0 It's semantic of SendAsync is different and completion indicates that message has been accepted in the buffer while task not neccasary complete, which means that buffer is full. https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.sendasync?view=netcore-2.0#System_Threading_Tasks_Dataflow_DataflowBlock_SendAsync__1_System_Threading_Tasks_Dataflow_ITargetBlock___0____0_

@edenhill @mhowlett I am thinking about Producer API like the following:

foreach(var msg in stream) {
    // await for producer buffer available, NOT message delivered
    await producer.Send(msg, handler));
}

Problem with current design is that Send returning individual messages tasks makes caller doing some non-trivial work to convert stream of tasks into batches to await on them (how big await batches should be? How size of await batches correlate with librdkafka internal batch size, etc). Moreover, this API invite to do await producer.Send(...) which will do not what you want at all: one message per batch.

@robinreeves You are using Message.Error in handler. I was not sure either it was kafka transport error or comprehensive all-possible-errors. But after seeing your example, I took another look at the source code, and looks like it is all errors and perhaps this is the recommended way to handle errors via HandleDelivery.

robinreeves commented 6 years ago

I'm thinking I was incorrect about overflowing the stack. Since it is ProduceAsync blocking, your IEnumerable of Tasks probably won't return because the calls for each element are blocking. I was thinking I didn't understand why you were saying this didn't fit the async paradigm, but now I get the point.

This is a tough problem all around. The Blocks example you reference I think does have the behavior I described. If you are over producing for long periods, you'll fill the stack with Tasks waiting for space in the buffer. I get why this is better from a programming paradigm perspective, but in the end the difference probably isn't much. In either situation, you need to detect and react to the fact that you are producing faster than sending, or you will eventually fall so far behind you'll run out of resources. If it is caused by a temporary spike in production, eventually you catch up either way.

I still think the solution you are searching for requires you to avoid awaiting Tasks. You send as much as you can as fast as you can and don't worry about it. If you need to ack to your source, do it in the callback. If the ProduceAsync call blocks, it is doing so for good reason. Back to the Block example, even though the method isn't blocking, that Task and data are stored somewhere, and you've just turned the stack into an unlimited buffer, until it is full. This kind of defeats the purpose of creating a buffer with limited size.

Don't get me wrong, this approach has all kinds of issues too. For example, if you don't wait long enough when you flush, or the process crashes, you'll be let with unacknowledged messages. However, a crash could produce the same results even while awaiting Tasks. You are trading some guarantees for massive scale with Kafka. If you need guarantees, use Service Broker. The nice thing is the blocking works to your advantage because you'll never overflow the internal buffer, and you'll be throttling your produced messages, but that may not be what you want either.

We're trying to accomplish similar things I think. Technically, I'm in more of a traditional producer/consumer paradigm, not true streaming, but the distinction is probably minimal for this discussion because I receive an unlimited number of messages and write them to Kafka. I'm still not sure about Task versus the DeliveryHandler. I just noticed the handler yesterday and have a lot of questions. The fact that it runs on the poll thread concerns me, and I didn't notice a huge resource difference between the two. Just using .ContinueWith() without an await is functionally the same as the handler.

I'm not sure there are non-trivial solutions, and no matter how this is implemented there is a lot of work to do to avoid all those interesting cocktail party stories. Also, you go to cocktail parties where people want to hear about this nonsense? :)

mhowlett commented 5 years ago

Thanks again @vchekan and @robinreeves for your thoughts here. I've had a chance to think this through now, and my thoughts are below.

I now believe that both ProduceAsync and BeginProduce (as it's now called in 1.0) should never block - about to make this change on the 1.0-experimental branch. They should always throw a KafkaException if invoked and the librdkafka message queue is full (error code Local_QueueFull) - the user can catch this and wait in a synchronous or asynchronous way themselves if they want.

The blocking behavior was intended as a convenience, but i don't think that it is. first, great discussion from you wondering about the interplay with the scheduler. second, you wouldn't want to use the blocking behavior in say a web request handler, since you don't want to block the thread. third, it's not typical, so even if users are aware of the behavior (note - it's no longer a parameter on the method, it's a config parameter, which is more hidden), they need to think through an un-usual pattern (as you are doing).

I don't think the librdkafka blocking call should be converted into a Task either since the point of having limits on librdkafka queue size is (presumably?) related to limiting memory usage. If we want to be able to have more simultaneous stuff in flight (which is effectively what awaiting a blocked call would achieve), just increase the limits in librdkafka instead.

I see the Task based method as mostly useful in scenarios where you want to await each task separately, e.g. web requests where you want parallelism across many simultaneous requests. I think if you find yourself wanting to batch up results, it's a wrong fit. I think something along the lines of the below is how to best produce an infinite stream at high speed:

Action<DeliveryReport<Null, string>> handler = (DeliveryReport<Null, string> dr) => { count += 1; }; // no interlock required.

using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
    var stream = Enumerable.Range(1, bigNumber).Select(i => i.ToString());
    foreach (var msg in stream)
    {
        while (true)
        {
            try
            {
                producer.BeginProduce("my_topic", new Message<Null, string> { Value = msg }, handler);
                break;
            }
            catch (KafkaException e)
            {
                if (e.Error.Code == ErrorCode.Local_QueueFull)
                {
                    Thread.Sleep(100);
                    continue;
                }

                Console.WriteLine(e.Error);

                // a non-retryable error occured. what is appropriate to do here will be application
                // specific and may depend on the error code.
                break;
            }
        }
    }

    producer.Flush(TimeSpan.FromSeconds(500));
}

Another thing to note is the Task based methods are less performant/consume more resources than the Action<DeliveryReport> methods, so if you care about performance BeginProduce is better. But the difference isn't that much.

I'm going to close this now since it's been open a long time, but feel free to comment further / re-open if you don't agree with the new proposed behavior when the librdkafka queue fills up.

hodzanassredin commented 2 years ago

I'm trying to figure out why this code hangs(deadlock)? Do I have to use Thread.Sleep?

        public async Task SendMessage(string key, TMessage msg)
        {
            //logger.LogDebug("Sending message into topic {_topic} key {key}", _topic, key);
            var m = new Message<string, TMessage> { Key = key, Value = msg };
            if (_producer == null) {
                logger.LogWarning("Producer for topic is null. Trying to recreate {@details}", new { topic = _topic });
                PreStart();
            }
            while (true)
            {
                try
                {
                    _producer.Produce(_topic, m);
                    break;
                }
                catch (ProduceException<string, TMessage> ex)
                {
                    if (ex.Error.Code == ErrorCode.Local_QueueFull)
                    {
                        await Task.Delay(50).ConfigureAwait(false);//this line is a problem
                    }
                    else throw;
                }
            }

            metrics.IncMessagesSent(_topic);
        }
            foreach (var state in states)
            {
                await deviceStateKafkaProducer.SendMessage(state.DeviceId, state);
            }
            deviceStateKafkaProducer.Flush();//hangs

P.S. I can reproduce this problem only on Linux.

Update: This happens in xunit tests with AsyncTestSyncContext and ThreadPoolScheduler

Update2: After investigation I found that there are a lot of error messages: Kafka producer error: "rdkafka#producer-10" Error { Code: Local_Transport, IsFatal: False, Reason: "localhost:29099/bootstrap: Connect to ipv4#127.0.0.1:29099 failed: Connection refused (after 0ms in state CONNECT)", IsError: True, IsLocalError: True, IsBrokerError: False }

Update 3 It was a problem with the latest version of confluentinc/cp-kafka image. :-)