confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
53 stars 857 forks source link

Losing/dropping Messages in High Availability Producer #375

Closed gazareid closed 6 years ago

gazareid commented 6 years ago

Description

I have a topic that needs to

I have set up an asynchronous producer in a loop that sends 2 million random message at a topic "testpoc". The topic "testpoc" is on 3 partitions in my 3 broker cluster and is set to a replication factor of 3. This all works perfectly under normal circumstances however, I need to test the contigency that a server may go down during a high load of incoming messages. To create this scenario I begin the producer which will take a couple minutes to send its 2 million messages and during that time I will stop whichever broker the quorum has elected as controller. When I do I would expect Kafka's at least once guarantee to deliver all 2 million messages even if the controller goes down. However each time I do this I can see a loss of a hundred to several thousand messages. I have tried setting the producer to be synchronous like so: producer.ProduceAsync("garytest011", key, msgToSend.Msgs.ToString()).Result; however this totally tanks my throughput just as the documentation says it will. What is the best way to guarantee "at least once" sending without killing my performance?

How to reproduce

I'm running a 3 broker cluster of ZooKeeper/Kafka on 3 separate vm's All settings are set to default In a C# .NET CORE 2.0 console app I create a basic producer like so:

static void Main(string[] args)
        {
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();

            var kafkaConfig = new Dictionary<string, object>()
            {
                ["bootstrap.servers"] = "kafkabroker01:19092,kafkabroker02:19092,kafkabroker03:19092",
                ["retries"] = 5,
                ["retry.backoff.ms"] = 1000,
                ["client.id"] = "test-clientid",
                ["socket.nagle.disable"] = true,
                ["default.topic.config"] = new Dictionary<string, object>()
                {
                    ["acks"] = -1, // "all"
                }
            };

            var producer = new Producer<string, string>(kafkaConfig, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));

            for (int index = 0; index < 200000; index++)
            {
                var key = "key" + index;
                var msgToSend = new SimpleClass(key, index, 10);
                var deliveryReport = producer.ProduceAsync("testtopic", key, msgToSend.Msgs.ToString())
                    .ContinueWith(task =>
                {
                    if (task.Result.Error.HasError)
                    {
                        Console.WriteLine($"Failed message: key={task.Result.Key} offset={task.Result.Offset} ");
                    }
                });
            }

            Console.WriteLine("Sent 2 million messages to testtopic");
        }

        public class SimpleClass
        {
            public string Key { get; set; }
            public int Counter { get; set; }
            public Dictionary<string, string> Msgs { get; set; }
            public int[] IntArray { get; set; }

            public SimpleClass(string key, int counter, int numMsgs)
            {
                Key = key;
                Counter = counter;
                Msgs = new Dictionary<string, string>(10);
                for (int i = 0; i < numMsgs; i++)
                {
                    var textGUID = Guid.NewGuid().ToString();
                    Msgs.Add(textGUID, "GUID = " + textGUID);
                }

                Random rnd = new Random();
                var size = rnd.Next(1, 50);
                IntArray = new int[size];
                for (int j = 0; j < size; j++)
                {
                    IntArray[j] = rnd.Next();
                }
            }
        }

While the process is running I stop the kafka elected controller to simulate a server down. Because the loop is done 2 million times there should be 2 million (or more because of at least once processing) messages in the topic. I count the messages in the topic with the following command: docker run --net=host --rm confluentinc/cp-kafka:3.3.1 kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:29092 --topic testtopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}' And I see anywhere from a hundred to several thousand less than 2 million.

Checklist

Please provide the following information:

edenhill commented 6 years ago

Are you seeing any Failed message: ..?

gazareid commented 6 years ago

The only thing I'm seeing in the console is: image

edenhill commented 6 years ago

You need to call Flush() before disposing the producer to make sure all in-queue and in-flight messages are delivered (or fails)

gazareid commented 6 years ago

Okay, that sounds logical. I tried that by adding the following:

var ret = producer.Flush(5000);
            Console.WriteLine("Flushing ret=" + ret);

and I still dropped ~2600 messages. Here is the errors I got in the console: image

And just to clarify, I'm not bringing the broker back up. My test is to intentionally leave it down.

when I count at the end of all this I get: 174422

edenhill commented 6 years ago

Use a higher timeout for Flush() to allow all your messages to be produced (the upper limit would be somewhat higher than message.timeout.ms)

gazareid commented 6 years ago

I see that the default value for message.timeout.ms is 300000 so I set the flush for 350000 like so: var ret = producer.Flush(350000); Is this what you were suggesting? With this set there was no noticeable difference in the number of messages dropped. A few minutes after the loop is complete all the console writes that I put in the loop print out with the error "Local: Message timed out" If I'm not doing what you suggested please let me know, and any other suggestions would be greatly appreciated.

gazareid commented 6 years ago

Also, the task.result.error.haserror being is triggered at timeout, rather than the msg being retried. Is this what it is supposed to do?

gazareid commented 6 years ago

Okay @edenhill I think I've reached a solution but I don't really like it. I set up a separate producer that runs and regularly flushes the queue producer.Flush(100); and I handle the errors by retrying all the failed messages. Then I lowered the message.timeout.ms to allow me to timeout messages and retry them more frequently. This has resolved my issue of dropping messages, but it's a job I expected the library to do itself.

Is there a way to guarantee "at least once" producing of messages without having to handle errors myself?

gazareid commented 6 years ago

Here's my entire producer https://github.com/gazareid/gaza-confluent-kafka-dontnet-producer/blob/master/Program.cs

edenhill commented 6 years ago

I don't see where you are counting the number of delivered messages, you only seem to count errored messages that are retried: https://github.com/gazareid/gaza-confluent-kafka-dontnet-producer/blob/master/Program.cs#L60

So how do you do know if all messages are produced or not?

A message.timeout.ms of 3s is very low, I suggest increasing it and relying on the library to perform retries for you.

Also, you really don't need to call Flush() from within your producer loop, it is typically only used at termination to wait for the outstanding messages to be delivered or fail.

So:

gazareid commented 6 years ago

@edenhill Thanks for replying so much you're awesome! To count them I'm actually doing a count of offsets in the topic on the kafka server with the following command: docker run --net=host --rm confluentinc/cp-kafka:3.3.1 kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:19092 --topic testtopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}' I have a flush in the loop because in my testing it appeared to be more efficient to flush regularly, but that could have been a "flook" of testing. I have been looking for documentation on flush() if you have a link to any I would really appreciate it.

I always had a flush at the end too. So I:

I updated my producer here: https://github.com/gazareid/gaza-confluent-kafka-dontnet-producer/blob/master/Program.cs#L60

Results

I ran the following test 3 times with small differences in the resulting counts and times.

  1. After starting the app
  2. I counted the offsets in the topic and could see that I was successfully producing messages there.
  3. Then killed the controller of the cluster to force a controller election while still producing to the testtopic.
  4. This is when I see the error messages in the console app below(as expected): image
  5. Then I wait occasionally counting the offsets with command at the top of this post.
  6. Next, at ~6:20 I see my counts displayed: image This doesn't add up to 1 million (992990)
  7. And I run my count of offsets again which returns 917263. Adding this to the errored messages doesn't add up to 1 million either. So there are 82737 total messages missing.
  8. Lastly I wait an additional 20 minutes to ensure no lagging buffer is released unexpectedly late.

I know that these messages are left over from when the down broker is unavailable, but why aren't they getting automatically resent my the library after the timeout expires?

edenhill commented 6 years ago

One last thing to try is to increase the Flush() timeout to a larger value, well above message.timeout.ms+socket.timeout.ms. In your case I suggest 5m

gazareid commented 6 years ago

I adjusted the flush() timeout to 300000ms(5min). And just to be clear, the way that is done is to put "300000" in the parenthesis in "flush()" right? like this:

producer.Flush(300000);

If there is some documentation on "flush()" I haven't been able to find it. Any links would be greatly appreciated.

I ran another test like those from above and it still lost messages only producing 988644/1000000 messages. Can you Clone or Fork my producer and see if you can recreate the issue? Is this a bug?

mhowlett commented 6 years ago

I have just skimmed this conversation for now, but here's a few notes:

documentation on Flush: https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer-2.html#Confluent_Kafka_Producer_2_Flush_System_Int32_

Make sure you Dispose your producer (put it in a using block is easiest).

Prefer to use a variant of ProduceAsync (yes I know badly named, we'll change that) that takes a deliveryHandler callback, not one that returns Task / use ContinueWith. This will be more efficient.

Ensure the value returned by Flush before exiting your program is 0. If it's not, all the messages haven't been delivered.

You may want to set linger.ms to be 50 or something if you want higher throughput.

I don't expect there is a bug, but I want to get to the bottom of what's going on.

gazareid commented 6 years ago

@mhowlett Thanks for taking this issue And thanks for the link on Flush

I am still dropping messages. I do a count on the offset in my testtopic with this command:

docker run --net=host --rm confluentinc/cp-kafka:3.3.1 kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:19092 --topic testtopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'

and it returns 995909 offsets.

Appreciate you looking at this. Please let me know what else I could try.

I updated the producer I'm using here: https://github.com/gazareid/gaza-confluent-kafka-dontnet-producer/blob/master/Program.cs

gazareid commented 6 years ago

@mhowlett @edenhill It's been a while since either of you have commented. Is this still being looked at? I have been asking in forums. I even tweeted about it. This is a major roadblock for my people.

mhowlett commented 6 years ago

your code is incrementing successful and errored on many threads with no synchronization. This would almost certainly mean the line Console.WriteLine($"errored: {errored} successful: {successful}"); will understate the number of messages actually succeeded / errored. What does this line print out when you run the code? and what is the corresponding number of messages reported in by your kafka.tools.GetOffsetShell call?

The code that ensures all messages have been flushed is commented out. Are you sure the value returned by the final call to Flush is 0?

gazareid commented 6 years ago

@mhowlett Thank you so much for getting back to me. I only put the incrementing successful and errored code in becuase @edenhill asked me to,

"count the number of successful and failed deliveries (in your Produce ContinueWith Task) and print those two numbers, along with the return value of Flush(), at the end of main." -@edenhill

I could see after the first run that it wasn't counting every loop. But the command I'm running to count the messages should be accurate.

docker run --net=host --rm confluentinc/cp-kafka:3.3.1 kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:19092 --topic testtopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'

I only commented out the flush lines because I was testing both with and without it and mistakenly committed the code where it was commented out. In my testing with those lines uncommented it reached 993430 messages (as seen with the above command) before the flush, and then the flush loop ran until it returned 0. I ran the count command again. No change. 993430 messages.

edenhill commented 6 years ago

@gazareid Try to split the problem into smaller chunks:

If you've reached this state with all messages accounted for (i.e., success + errored == your Produce() count), then move on to checking with a proper Consumer if you can consume all those messages.

The GetOffsetShell reports the latest offset for each partition, but the latest offset does not tell you how many messages there are in the partition, due to retention, compaction, etc, so it is not a reliable tool for what you are trying to do. Instead use a proper Consumer and consume the messages you expect.

mhowlett commented 6 years ago

GetOffsetShell won't, in general give the number of messages in the partition, but the latest offset should be greater than or equal to the number of messages in the partition, and here it is not. I think i'm sufficiently worried that I'm going to try and replicate. @gazareid, it would still be good to see if you get the the same count by consuming and counting messages vs GetOffsetShell (and if you do implement that, feel free to link to your program). Sorry for the delays. We have more high priority tasks than we know what to do with, and this issue is superficially something we would be surprised if it was broken (though a serious problem if it is), hence the delay in addressing it.

I looked at our verifiable client tests @edenhill, and I see a test that covers something close to this scenario for consumers, but not producers (I may be missing something though).

gazareid commented 6 years ago

@mhowlett Please let me know the results of your attempts to replicate. This matter in our opinion is CRITICAL as we cannot go into production with a massive hole in the availability of the solution. Please also know that in my team's testing the Java client does not do this and ‘out of the box’ works fantastically. However, considering our environment we would prefer to use the dotnet client.

I did as @edenhill advised

Conclusion

All three sources of information we have, the kafka count of the topic, the consumer's thread-safe count and the couchbase db count show a lot of missing messages. My number of messages written to that database after the flush = 0 was similar to the tests done before. Missing messages when I take a broker down.

I didn't post the code, but here a SS of my couchbase db. before consuming the messages: image

After consuming the messages: image

mhowlett commented 6 years ago

I tried to reproduce by doing the following:

Added synchronization to the success/error counts in the above code. created a 3 partition topic with replication factor 3 on a 3 broker cluster. killed the controller broker during as the program was running. I'm on macos, dotnet core2, librdkafka 0.11.3

the application prints: errored: 13298 successful: 986702 The sum of the end offsets in each partition using GetOffsetShell is 986702 which equals the number of messages successfully produced.

so, so far I've been unable to reproduce.

gazareid commented 6 years ago

@mhowlett
Thank you so much for reproducing for me. You HAVE recreated my issue. Your 13298 messages are what I'm concerned about. They aren't being "retried" by the library automatically as very clearly explained in @edenhill 's documentation here: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#message-reliability

Message reliability

Message reliability is an important factor of librdkafka - an application can rely fully on librdkafka to deliver a message according to the specified configuration ("request.required.acks" and "message.send.max.retries", etc). If the topic configuration property "request.required.acks" is set to wait for message commit acknowledgements from brokers (any value but 0, see CONFIGURATION.md for specifics) then librdkafka will hold on to the message until all expected acks have been received, gracefully handling the following events:

  • Broker connection failure
  • Topic leader change
  • Produce errors signaled by the broker This is handled automatically by librdkafka and the application does not need to take any action at any of the above events. The message will be resent up to "message.send.max.retries" times before reporting a failure back to the application.

And the default settings for the retry logic should suffice for my situation. Or I assume @edenhill would have suggested a custom configuration by now.

If I pass through a retry function in the "ContinueWith" method of the producer than I'm able to get every message I send every time Like so

for (int index = 0; index < 1000000; index++)
{
  var key = "key" + index;
  var msgToSend = new SimpleClass(key, index, 10);
  ProduceMessages(key, msgToSend.Msgs.ToString());
}

void ProduceMessages(string key, string msgToSend)
{
  producer.ProduceAsync("testtopic", key, msgToSend)
    .ContinueWith(task =>
    {
      if (task.Result.Error.HasError)
        {
          Console.WriteLine($"Resending Message: key={task.Result.Key} offset={task.Result.Offset}");
          ProduceMessages(key, msgToSend);
        }
    });
}

but handling this retry logic is something the library should be doing for me.

mhowlett commented 6 years ago

I'm not sure why librdkafka is considering this a non-retriable error - it seems to me that it shouldn't need to, though I expect there is some technical reason why this is the case.

there are other scenarios that are considered not retriable as well. generally, you definitely need to handle delivery reports with errors and assume the message didn't get through.

joncatlin commented 6 years ago

I work with Gary and am concerned that the underlying librdkafka is not working correctly. Having spent time with the Java client, it does not exhibit this sort of behavior. Out of the box the Java client handles this situation without having to manually retry anything. I know they are different technologies but I would have thought that if the Java producer can handle this situation then another implementation could also.

I am concerned that to use the C# software we are going to have to first trap and handle these timeouts and then deal with the inevitable reordering that this will cause. Can you confirm that the timeouts would be delivered in the exact same order the messages were sent? If not how do we cope with the reordering issue?

Any help with this would be greatly appreciated as we are trying to go into production soon and this is a fundamental issue that we need to resolve.

mhowlett commented 6 years ago

If you have configured max retries to be 1 then delivery reports are guaranteed to be provided in the order the messages were sent provided you use a deliveryHandler variant of ProduceAsync. If you use ContinueWith's (like you are doing), tasks are given to you on a thread pool thread, so order cannot be guaranteed. Note that any messages in the librdkafka queue will be sent, if possible, regardless of previous errors, so the only way to ensure in-order message delivery is to produce synchronously (which is slow).

Recent versions of the Java client include an idempotent producer, which guarantees exactly once in-order delivery of messages in a much more performant way to the above. This is currently not implemented by any other client.

Typically, Kafka consuming applications are written to handle out of order messages and duplicates. You could also think if your software can be written to work this way.

mhowlett commented 6 years ago

@joncatlin @gazareid - thanks for your persistence. you are right, this scenario should be auto-retried. It'll be fixed in the next release.

edenhill commented 6 years ago

We're working on formalising and fixing the retry behaviour in librdkafka, we'll keep you posted.

joncatlin commented 6 years ago

Many thanks guys for digging in and getting to the bottom of this. Please let us know if we can help with the testing.

gazareid commented 6 years ago

@edenhill @mhowlett You guys are awesome. Thank you very much for your stellar community involvement.

Imran-Wahab commented 6 years ago

@edenhill @mhowlett : We are also facing the data loss during graceful shutdown of one of kafka broker nodes during performance testing. We are using java client 1.0.0 for testing. Following are the details Kafka Broker 1.0.0 - 3 node cluster with replication factor of 3 A topic is created with 400 partitions 4 Kafka producers (version 1.0.0), each sending 1000 msg's per second on a topic by using producer's async API. We are running the performance load for 10 mins and in between one of the kafka nodes are shut down gracefully. We are expecting that producers should send 4 1000 60 * 10 = 240000 msgs on topic, but topic contains 239465 (535 less) msgs. Also we are not seeing any error/exception in producer callback. Therefore it's not clear where these 535 msg's gone. Is it some issue with Kafka Producer or we are missing some setting either at producer or at broker side

I am not sure whether this is correct thread to discuss this issue. Please bear with me on this

edenhill commented 6 years ago

Please provide your configuration

Imran-Wahab commented 6 years ago

@edenhill :Thanks for responding quickly.

Following is the producer configuration kafka.acks:all kafka.linger.ms:0 kafka.num.put.retries:15 kafka.key.serializer:org.apache.kafka.common.serialization.StringSerializer kafka.value.serializer:org.apache.kafka.common.serialization.ByteArraySerializer kafka.partitioner:org.apache.kafka.clients.producer.internals.DefaultPartitioner kafka.replace.classloader:true

set this to true if you don't want any writes to kafka. purpose is to determine time taken without kafka in the processing thread

kafka.no.puts:false

use -1 to disable logging. logging can tell you how much time it takes to process x number of messages if x is given as the frequency

kafka.log.frequency:10000 kafka.send.buffer.bytes:817889280 kafka.request.timeout.ms:1500000 kafka.compression.type:snappy kafka.batch.size:0 kafka.max.block.ms:30000 kafka.max.in.flight.requests.per.connection:1 kafka.retry.backoff.ms:60000 kafka.metadata.max.age.ms:60000 kafka.reconnect.backoff.max.ms:100000 kafka.reconnect.backoff.ms:30000

Also by changing some of the parameter values some times the test passes, but it's not consistent and fails. Just with (kafka.num.put.retries:4000) the test passes

Note : There is some property name mappings with our configuration and kafka producer.

Do, let me know if you require more information.

edenhill commented 6 years ago

That looks like Java Producer configuration. Are you seeing an issue with the .NET client?

Imran-Wahab commented 6 years ago

@edenhill : This issue i am facing is with Java Producer.

edenhill commented 6 years ago

@Imran-Wahab This is for confluent-kafka-dotnet issues only, for the Java clients I suggest the official documentation and the kafka-users mailing list.

SauravChe1905 commented 6 years ago

@edenhill Is there an option to store the unsent messages locally (on the file system) if the KAFKA broker is unavailable?

edenhill commented 6 years ago

@SauravChe1905 There is not, you would need to do that yourself.

jlisam commented 6 years ago

hi 👋 , w.r.t https://github.com/confluentinc/confluent-kafka-dotnet/issues/375#issuecomment-351213598, are there any updates ?

edenhill commented 6 years ago

@jlisam This was released with the v0.11.4 release

mhowlett commented 6 years ago

resolved in 0.11.4

mhowlett commented 6 years ago

( librdkafka PR: librdkafka PR: edenhill/librdkafka#1571 )