confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
68 stars 861 forks source link

High Performance Producer and General Error Handling #521

Closed DarrenHart1989 closed 6 years ago

DarrenHart1989 commented 6 years ago

Description

I have a question regarding high performance producing. We currently implement the Producer as a single shared instance, within a Web Service (ASP.net WebApi). We initially just had a simple call to ProduceAsync which would have a continuation when something had faulted and log an error. However we noticed that we would constanty get Local Message Timed out messages being thrown, with no other information, repeatedly and only some messages would get through to the Kafka Topics they were being sent too.

We have changed to implement our own timeout logic, where we effectively keep hold of the task then wait for a period of time and if the task has not completed during that time we assume that the kafka brokers are offline.

Looks similar to this:

public Task<Message<Null, string>> SendOne(string data)
        {
            // send data if we are online or should reconnect
            _mainLog.Info($"IsOnline: {IsOnline} ShouldRetryConnection: {ShouldRetryConnection}");
            if (IsOnline || ShouldRetryConnection)
            {
                // start async task
                var task = _producer.ProduceAsync(_topicName, null, data, -1);

                // wait for it to complete or timeout - this doesn't stop the task
                IsOnline = task.Wait(SendTimeout);

                // log any exceptions/errors
                // were there any exception thrown
                if (task.Exception != null)
                {
                    // there are so switch offline
                    IsOnline = false;
                    foreach (Exception e in task.Exception.Flatten().InnerExceptions)
                        _errLog.Error($"[{_name}] failed to execute kafka producer ProduceAsync", e);
                }

                // we have exceeded our timeout here to wait for the task
                // we are now offline from now
                if (!IsOnline)
                {
                    // mark when we went offline
                    OfflineFrom = _clock.UtcNow;

                    //log the data that we have failed to deliver - in order to retry later
                    LogMessage(data);
                }

                //still return the task as there might be some continuation being done
                //further up the chain to log out an error if needed
                return task;
            }

            //log the data that we have failed to deliver - in order to retry later
            LogMessage(data);

            // we are offline and should not retry connection yet
            throw new InvalidOperationException($"[{_name}] failed to send messages to kafka. Online: {IsOnline}. Retrying: {ShouldRetryConnection}");
        }

The above is effectively a backoff mechanism if kafka cannot handle our spikes in message producing.

So I am really looking for guidance to the following questions, or even if you can shed some light to help me better understand the issues that are happening.

  1. Is this the correct way to handle the producing, especially towards loss of messages?
  2. Is there a better way? or should the producer actually handle these errors for us gracefully?
  3. Do you have a recommended way using the client and how a producer should be setup? Obviously all applications are different but a standard set of rules which should be applied when using the client?
  4. Do you have a recommended way to produce messages efficiently?
  5. Should we be handling errors/exceptions in a different way? Is there a recommended way? note We are producing fast, I just think that we aren't producing in the correct way with regards to this client.

Checklist

Please provide the following information:

robinreeves commented 6 years ago

I'm having also having issues trying to figure out how to scale this. Things work fine most of the time, but the way I've coded it there are times where I end up killing the process, due to memory consumption I believe. I think part of the issue is due to the number of Tasks I'm creating. I should probably use the handler approach instead, but I have a slight issue with it. I assume the memory usage is due to memory being consumed by the message data while awaiting acknowledgements. I can watch the produced, but unacknowledged, message count climb as the memory usage increases. I've set queue.buffering.max.ms = 2000, but I haven't implemented compression in production yet. I intend to do that during my next release cycle. There is a process that drops fairly large messages (.5 MB) periodically and when that backs up this issue occurs. I realize the message must be retained so retries can be attempted, but is there any way memory can be better managed, or, more likely, is there a way I can better optimize the way I am using the library?

Side note, on the delivery handler pattern. Currently, I use a surrogate key, that is not part of the message itself, as an identifier, and I need this identifier in the handler to mark the message acknowledged. Using the Task pattern, I can easily pass this identifier to myself. Using the handler, since I use the same handler for all messages, the only way I can think of to get the message identifier to the handler is to use it as the partitioning key. This probably isn't a big deal, but I don't like that it is the only option. This key is meaningless as far as the message is concerned, but now I'm partitioning on it. I'd much prefer round robin. Am I missing something here too?

Another aside, would it be possible for someone who knows the intended usage to post some more complex examples and maybe some commentary about how best to scale in particular scenarios? I'd be happy to contribute to the project myself, but I'm not sure what I can offer at this point.

mhowlett commented 6 years ago

what is the approximate throughput at which you're running into problems? it seems like memory usage can be optimized here somewhere - I've not profiled this tbh, though I'm not aware of anywhere that would be hugely inefficient. It would be interesting to see how many Tasks are surviging gen 0 garbage collection. you could set queue.buffering.max.kbytes lower which, by default, will cause produce requests to block, though this is not ideal obviously. really, we need to profile memory usage and go from there. I've opened an internal task to track this.

yep agree, we need some more complex examples, it's on the todo list.

you can set the partitioner config property to random.

robinreeves commented 6 years ago

I need to do some deeper analysis also. I'll see if I can do some memory profiling sometime this week. This has been working perfectly unless I get a backlog, so I've let it linger for a while. I also thought I'd look into it more deeply once v1.0 was officially released, but I should probably just get on it. Based on what I'm seeing, it certainly could be a legitimate build up of messages in the sending queue. I can watch the memory usage go up and down as messages wait for acknowledgement. My Kafka instance is in AWS as well, and we have seen some network latency there, so lots of potential culprits that aren't the producer class.

I'm not producing at a level that should be even close to requiring a distributed solution. Generally, I'm producing about 40K messages an hour in the 10s to 100s of KB per message. When I see issues I'm trying to send several thousand messages at about .5 MB per message. This doesn't seem like a large amount of data, especially in the Kafka world, which is why I assume I'm probably doing something wrong, or the network latency really is extremely poor. Compression might fix this right up.

I'm not sure the queue size is the issue. I've see the service consume over 3GB. Even if I'm holding two copies of the message data, that should be just over 2GB, at a rough estimate. Is there a way to interrogate the underlying C classes to examine current queue size? It looked like you might be exposing something like that in v1.0, but I wasn't sure if that was the intent.

I'll profile a Task based and a handler based version and see how they compare. I'll let you know what I find.

edenhill commented 6 years ago

Is there a way to interrogate the underlying C classes to examine current queue size? It looked like you might be exposing something like that in v1.0, but I wasn't sure if that was the intent.

Register an OnStatistics handler and set statistics.interval.ms and you'll get internal queue counts and sizes as described here: https://github.com/edenhill/librdkafka/wiki/Statistics

robinreeves commented 6 years ago

I have been retaining the statistics JSON from the beginning. I think I've only been polling every 10 minutes, but I'll go back and look them over during periods where I was having issues. I'll collect them on more frequent intervals during testing.

I must have missed the Wiki page with all the field info because I was never sure what all the stats meant.

robinreeves commented 6 years ago

I ran some tests this afternoon and it does appear network latency is the driving factor here. I finally have a test cluster in our data center and zero issues there. Switch over to AWS and messages start queuing. Memory profiling shows most of the memory allocated to TypedTaskDeliverHandlerShim objects, and the number of objects on the heap is pretty consistent with unacknowledged messages. All of this lines up with the statics information in the JSON. So I should be yelling at the Hadoop architect and not you folks. It isn't like we identified this issue nine months ago with other parts of the Hadoop stack.

Side question, is there an error in the documentation for the configuration properties? It shows the default value for queued.max.messages.kbytes as 1048576, but the stats show the msg_size_max as 4096000000. I'd always thought the max size of the queue was 4GB, but I'm not sure why I thought that. Also, how do the statics work? Do they reset each time they are emitted? If not, how do I know what the time frame is for all of the counts?

One other item of note is I'm seeing some timeouts, which I assume are due to the backlog in the queue taking too long to process. There are quite a few local message timeouts, but the stats show some timeouts and retries on only one of the nodes. Are these timeouts the same as the local timeouts, or do I also have items timeout out during sending? Given that only one node appears to have this issue, maybe there is some kind of issue just with that node. I know we saw an issue with Sqoop a while back where a particular node seemed to have a network latency issue, but others did not. Anyway, I'll just have to work with the Hadoop admins to get this working more smoothly. Thanks for helping point me in the right direction.

DarrenHart1989 commented 6 years ago

Apologies for not replying after creating this issue. After posting I ran a series of tests with different configuration. The fastest and most reliable is below:

private readonly Dictionary<string, object> _defaultConfig = new Dictionary<string, object>()
        {
            { "compression.codec", "lz4" }, //compress message being sent in LZ4
            { "socket.blocking.max.ms", "1" }, //Windows implements a TCP NODE DELAY algorithm, setting this low ensure it is disregarded
            { "socket.nagle.disable", true },
            { "retries", "3" }, //attempt to retry 3 times before failing the message send
            { "request.required.acks", "1" }, //at least one broker must respond that it has received the message set
            { "default.topic.config", new Dictionary<string, object>()
                {
                    {"message.timeout.ms", "10000" } //set the timeout on the topic to be 10 seconds
                }
            }
        };

We found two settings which were the most important especially in a windows environment were { "socket.blocking.max.ms", "1" } and { "socket.nagle.disable", true } all others had little effect on our load.

I also tested it with different sizes of messages being sent from small (~2kb) to large (~5mb).

@robinreeves our setup seems similar to yours where we were sending to a remote AWS machine however we discounted any network latency with AWS as when we ran it within AWS's network (i.e their local IP Address space) we also came across the same issues. Note, these didn't solve all the issues, where sometimes messages are dropped for no reason, but it does solve the vast majority of weird failures that were happening. But our backoff wrapper handles these pretty well.

Hope this helps?

mhowlett commented 6 years ago

there it actually a bug with socket.nagle.disable on most platforms (unsure if on windows) in 0.11.4, which is resolved in 1.0-experimental-4 which I'm going to upload to nuget later today. for low throughput scenarios, this can be a very important setting for reducing latency (or the bad interaction between Nagle and delayed ACK will cost you an extra 40ms). increasing linger.ms will allow for better throughput above the default settings at the cost of worse average latency, but this is really only necessary for extremely high throughput scenarios. the default settings are good general purpose settings.

DarrenHart1989 commented 6 years ago

Will increasing linger.ms not actually increase the likely hood of timeouts happening, as the internal queues will hold onto the messages for longer? Or have I misunderstood the importance of linger.ms

The default settings are in effect what we used initially, with the addition of compression, acks and timeouts and we found that this would constantly give us the Error Local Message Timeout during minimal load and we would only get a small rate of messages a second. The socket.blocking.max.ms setting actually seems to be the important one here, as this pushed us from doing 400 messages a second, to well over 12,000/s on very modest hardware. We believed it was down to the node delay windows implements which isn't really seen on other OS's, but this could also just be a coincidence, what is the default for this setting?

mhowlett commented 6 years ago

@DarrenHart1989 (just looked at your code) since you're producing synchronously socket.blocking.max.ms will indeed be the most important setting on windows until we change librdkafka internals to work well with Windows. For synchronous production latency will be the bottleneck for throughput. If you produce asynchronously, max throughput will be more like 250k msgs/s. increasing linger.ms will improve batching which improves throughput, but if you're producing synchronously, there is no batching, so it will act to increase latency and reduce throughput.

DarrenHart1989 commented 6 years ago

@mhowlett we aren't producing synchronously - unless I have missed something. All we do is add a task.wait() which doesn't cancel a task when the time limit is reached, just tells us whether the task was complete within the timeout, if it has all is well, if it didn't and the timeout was reached we assume that something internal has gone wrong (either with Kafka or the network).

Unless I have misunderstood how the ProduceAsync method actually works I assumed it would complete the task when the message had been handed over the the librdkafka internals? rather than waiting around until a batch of messages have been sent?

robinreeves commented 6 years ago

No, the Task finishes when the delivery report is generated, after the messages are acknowledged from Kafka.

mhowlett commented 6 years ago

task.Wait() will block until the delivery report is available.

DarrenHart1989 commented 6 years ago

Apologies I had assumed the delivery report was from the internal librdkafka not the remote one, so I guess that would explain why that setting was so important.

So we should remove the wait and not hang around for any task completion or do you just use continuation options with the ProduceAsync methods to see when it has either failed or not?

mhowlett commented 6 years ago

so many options .. the cleanest thing to do, if you need to handle errors in a sequential flow (as you it looks like you are trying to do), is use async/await and just await the produce call, but this is possibly a big change to your application. If you remove the Wait, checking for exceptions inline is not useful either - that all needs to be done higher up somewhere. You could also consider using the callback variant of ProduceAsync, but this is structurally more complex / less convenient than using async/await.

DarrenHart1989 commented 6 years ago

I guess I could use a combination of continuation and event subscriptions? so instead of doing the async/awaits - we can use continuations and fire an event when an error occurs I guess then that will allow us to manage it at a higher level - in this instance via the controller or event wrapping the Producer as a singleton within the WebApp.

I will have another look into this and re-run our tests against it to see which approach works best - it could end up being that this way is the best for our application.

robinreeves commented 6 years ago

I implemented it using ContinueWith. I need guaranteed message delivery, so I need to process the delivery report and mark messages acked or resend. It is pretty simple. I don't know if this is the best implementation, I'm considering changing to use the DeliveryHandler instead, but it seems to work. Also, now that I know about the random partitioning option, I should just use the Key instead of passing the field to my method.

`var produceTask = producer.ProduceAsync(message.KafkaTopicName, message.TransmissionQueueId, message.Message);

SetMessageSent(message.TransmissionQueueId);

produceTask.ContinueWith(task => ProcessKafkaMessageResult(task, message.TransmissionQueueId));`

DarrenHart1989 commented 6 years ago

So I decided just to change it to remove the task.wait() and use the the delivery handler method, which appears to work (@robinreeves - thanks for the direction on that one) - we still only get 25K msgs/s but that could be down to some other settings we are using so I can experiment more with that.

Can I just ask is it better to have a larger message.timeout.ms value we currently set this to 10000 but saw that the default for librdkafka is 300000 or the lesser value of socket.timeout.ms which is defaulted to 60000, will it help with the Local: message timed out to make this a very large value to handle peak loads? Or do these settings not have any effect on this error?

robinreeves commented 6 years ago

I made the compression change in production and I think it caused another issue. I believe since I increased throughput, and number of messages per batch, more Tasks are being fired off at once and it is slamming my database. It is my understanding that the DeliveryHandler runs on the poll thread, which means it won't fire all at once. However, if the handler doesn't complete quickly enough, a backlog will build because the poll thread is being held up. Am I understanding this correctly? I suppose I could put some kind of semaphore in there to limit concurrent executions as well.

I also have a suggestion. I see the advantage of providing a handler for each message, if you want to do different things, but in my case everything gets the same handler. For what I'm doing, it would be more efficient if I could receive the acknowledgement from an entire batch at once so I could commit them to the database together. With the current process, I end up opening and closing a connection for each message, which isn't ideal. It would be a nice feature if there were an option to set a DeliveryHandler for a batch and receive it all at once. I don't know if this is reasonable as the code is currently designed, but I'll bet there are other out there who might benefit from receiving everything in a batch at once.

mhowlett commented 6 years ago

yes, all delivery handlers are called sequentially on a single thread. it would be straightforward to do the batching yourself - have the delivery handler in a class that manages batching. It would perhaps often be most optimal to have the batches align with kafka protocol responses, but this information is not exposed. Still, I think it's unlikely we'll ever expose batched delivery reports - it would require significant changes, additional api complexity and it's not clear there is enough benefit. You could consider utilizing the thread pool from your delivery handler.

robinreeves commented 6 years ago

So when you're invoking the handler in the poll thread, are you invoking it asynchronously? If so, simply changing to the DeliveryHandler shouldn't address my problem. I can think of several ways to deal with it, but some of them would only be an improvement if the handlers are called synchronously to avoid a flurry of database connections. I did consider batching myself, but that's a bigger change and right now things aren't working so well in my production environment, so I'm trying to come up with the simplest solution. I already have the DeliverHandler code ready to go, so that's why I'm focusing on that.

mhowlett commented 6 years ago

when you produce using a method that accepts a delivery handler, all callbacks are called sequentially on a single thread. if you make the IDeliveryHandler.HandleDeliveryReport async, the .net framework will call it on a thread pool thread, so you could achieve concurrency that way.

mhowlett commented 6 years ago

closing this as it's a question thread that's now pretty old. action item is to make a related example (we're tracking this task internally).