Closed mhowlett closed 7 years ago
Well, it's not a memory leak here (memory not freed which would end up using all memory and end in OutOfMemory exception) but an AccessViolation (much worse - we are writing in an invalid space - I think we rather release memory we shouldn't have because used in librdkafka)
@chiru1205 Memory usage would be normal - as long as message don't timeout (default: 5 min), we won't release any memory, as it's possible the broker switch on again so we can send message. The default is very high, reducing it might be better in prod to avoid huge memory usage in this case.
If you want, you can register to error and check for Local_AllBrokersDown. When this happen, you can effectively stop sending message by yourself. But when there is a tempoary network failure (10s or so), we don't want to stop sending in most case - just to wait a little, and you can configure this time in message.timeout.ms
300000 by default)
Other mechanism can be done, but it's more specific than you think (do not send anything, want to keep order, store in a permanent storage, no more than x message, etc)
Also, you can define a max queue size for queued messages : queue.buffering.max.kbytes
(default 4000000, so 4Go) after this, you will either throw exception or block (default, there is a parameter for this in one of the ProduceAync override if you want to throw which I woud advise in your case)
@treziac you are correct. I don't think its memory leak. ,Here is some more information, I noticed that this issue happens(at least in my case) when queue.buffering.max.messages is increased to 2 Million messages and each of my message size is 2398 bytes. That's a lot of data. There is not enough memory to accommodate queue requirements. Here are some configuration parameters I'm using
{"bootstrap.servers", "test"},
{"delivery.report.only.error",true },
{"queue.buffering.max.messages", 2000000 },
{"queue.buffering.max.ms",100 },
{"message.send.max.retries", 3 },
{ "debug", "all" },
{"client.id", Dns.GetHostName()},
{
"default.topic.config", new Dictionary<string, object>
{
{"acks", "all"}, //updated to support leader & follower acknowledgments
{"message.timeout.ms", 3000}
}
}
Changing the setting back to default values(100000 for queue buffer max messages) did work without any exceptions. I did a test for 6 millions messages and all 6 Million error messages came through. I may have to play with queue.buffering.max.ms to keep up with send requests.
Well, didn't see your message while editing :) Thanks for feedback!
But there is effectively problems with delivery.report.only.error
and the Task ProduceAsync
method.
But there is effectively problems with delivery.report.only.error and the Task ProduceAsync method.
I have removed that setting completely from the configuration and using a non serializing producer.
The growth in memory usage was initially worrying, but with the numbers mentioned above I don't think there is a problem.
The access violation seems likely caused by an OOM error which isn't being checked for in librdkafka ( and arguably shouldn't be: https://stackoverflow.com/questions/763159/should-i-bother-detecting-oom-out-of-memory-errors-in-my-c-code )
I have increased my message set size to send more messages. Changed it to 5MB and achieved more through put. I think this issue can be closed at this point.
from #180, @chiru1205 notes:
I have noticed another memory leak issue this morning while testing a disconnect scenario. Steps to recreate the issue
Shut down Kafka Broker(in my case local). Use a non-serializing producer with a Delivery Handler. Send thousands of messages to see if Delivery Handler can handle the error logging. I received error messages as expected. But memory was pushed to the limit and finally got the Access violation exception. Here is a screenshot of the memory leak. image I think there should be some kind of mechanism to halt message send requests if broker is not connected or down.