Closed avoxm closed 8 years ago
It looks like the issue is here : https://github.com/ntent-ad/kafka4net/blob/692dd1c902d754537113ba372568296078ee2c8f/src/Producer.cs#L557
We have been running significant producer load on this library in production stably for weeks without any sustained memory growth.
Is your data stream very bursty? I.E. do you often batch 1 million or so entries, then just sit around and wait for a while?
If that is the case, I suspect what might be happening is the circular buffers used to buffer messages prior to them being sent is auto-growing but then not shrinking back down.
The memory used for this is not "leaked" though, because as you keep using the system it will grow only as much as is needed, and as you use that buffer, the items no longer needed in the buffer will be released.
To test for a true "leak" you need to run over longer time I think. Try sending 1mln, wait for them to send, then send another million, wait, then another million, etc.
Hah, yes overlapping comments. It is just as I suspected. The producer, and its circular buffer were designed largely around a consistent stream of messages. I am fairly certain if you repeat your test and measure across say half a dozen or so batches (of whatever size) you will get constrained memory growth limited by the max size needed to buffer a particular "batch".
Yes we process data in batches with some pause in between. The thing is that buffer keeps on growing small by small. For example after 10 batches of 100K (each message is 2KB) it keeps ~1.8GB of memory.
And the question is why it is still keeping it after closing the producer by calling CloseAsync ? Isn't it suppose to release the memory.
It looks like _allPartitionQueues should be set to null in CloseAsync. Otherwise it seems that it will accumulate memory consumption.
Here is the scenario for my batch processing :
So each producer will consumer a memory for that buffer, which will keep accumulating.
You do not have this issue possible because you do not process data in batches and have only one producer that streams data.
Do you know for certain that your througput is sufficient to send all messages in one batch before the next one starts?
If you cannot keep up, then obviously the buffer must grow because you are adding another batch while there are still some messages left in the existing batch.
In the circular buffer implementation, in the case of a reference type
I agree with you that we could clean up better the used buffers when the producer is not needed or used anymore. I do not really want this to be added to CloseAsync though and would rather use the disposable() pattern to release buffer(s). Then if you want to create a new producer for every batch (would advise against this) you can as long as you dispose the old one. That being said if you are already de-referencing the entire producer between each batch, it should release all of its references for GC at that time.
You mention "each producer" are you creating new producer for every batch?
After you close the producer, are you correctly releasing all handles to the producer? If so, the producer and all of its resources should be available for garbage collection once you have no handles left to the producer.
@avoxm It looks like _allPartitionQueues should be set to null in CloseAsync
I'm not sure how this is possible. If you create and dispose producers, then GC should free producer's memory and thus _allPartitionQueues too. How it's possible that it is not released?
BTW, I would highly suggest using a single producer for each topic and writing a shim to access a producer across batches. This will avoid a LOT of thrashing (new connections, new buffers, fetching metadata, etc)
Yes for the first question. Batches are being processed in synchronously. Next batch will be processed only after await CloseAsync. The fact that buffer needs to grow is fine the problem is that it is not being cleaned up after producer is closed.
@vchekan not sure yet how that is possible, but you can simulate the scenario I have described and attach a memory profiler to get more detailed picture.
@thunderstumpges when you say "are you correctly releasing all handles to the producer?" what do you mean ? Is there anything more to be done except CloseAsync ?
As for creating a new producer, I was thinking to use one producer only, but the nature of application requires batch processing and events that trigger it can be random.
Seems there is not nice behavior in CircularBuffer.Get, when element which has been taken from queue is not assigned null and thus reference to a message remains and prevents sent message from GC until circular buffer makes full circle and overrides it. While not really a leak, it can cause red herring when using memory profiler. I'll fix it within a couple of days. But if you can produce a unit test, it would speed up things.
Handles to the producer would be variables, fields, or other references preventing garbage collection. If you are not re-using the same producer instance then the buffer growth would only ever happen for one batch and the buffer for the next batch would be in an entirely new producer.
I think the best way to proceed would be if you could write a unit test that demonstrates your issue we can see how you're using the producer(s) and cluster(s) across batches.
In CloseAync there's this code :
_sendMessagesSubject = null; OnPermError = null; OnShutdownDirty = null; OnSuccess = null; OnTempError = null;
I was thinking to add _allPartitionQueues = null as well. It still needs to be checked, but it is possible that there's some event or anonymous function keeping a reference to it that's why GC is not marking it for cleanup.
Sure, I will create unit tests.
Meanwhile could you please tell me when the buffer is being shrunk ? Let's say throughput was not keeping up with my send requests and buffer grows, how and when it will release the memory ?
Here is the sample code to simulate the problem :
var topic = "yourTopic"
for (int i = 0; i < 10; ++i)
{
var producer = new Producer(seed2Addresses,
new ProducerConfiguration(topic);
await producer.ConnectAsync();
for (int j = 0; j < 1000000; j++)
{
var msg = new Message()
{
Key = BitConverter.GetBytes(1),
Value = Encoding.UTF8.GetBytes(String.Format(@"SomeLongText - {0}", j))
};
producer.Send(msg);
}
await producer.CloseAsync(TimeSpan.FromSeconds(60));
}
@avoxm Thanks for the code. It allowed me to verify your use case quickly.
I was able to verify that 10 Producers are somehow surviving GC. Together with a bug in Circular queue not releasing items after dequeuing it increase memory consumption. Fixing CircularBuffer bug decreased memory consumption from 43Mb to 5Mb. I've committed the fix (code only, did not regenerate nuget) and will investigate why 10 Producers survive.
It looks like there is a memory leak. The issue is that after sending a big chunk of data kafka4net still keeps a reference to some data that keeps memory from being collected.
This is quite easy to reproduce
You can also run a memory profiler to get more idea.