theburningmonk / ReactoKinesix

A Rx based .Net client library for working with Amazon Kinesis
http://theburningmonk.github.io/ReactoKinesix/
MIT License
29 stars 5 forks source link

Memory leak when processing large amounts of messages #52

Closed khanage closed 9 years ago

khanage commented 9 years ago

Firstly, apologies for the lack of information, this is a shot in the dark to be followed up with a bit more information.

If I run a basic test, ala:

        [Test]
        public void RawApi_Test()
        {
            var region = RegionEndpoint.APSoutheast2;
            var appName = "RawApi_Test";
            var streamName = "LargeStream";

            var processorFactory = new ProcessorFactory();

            using (var app = ReactoKinesixApp.CreateNew(kinesisReadonlyKey, kinesisReadonlySecret, region, appName, streamName, "PHANTOM", processorFactory))
            {
                app.StartProcessing("01");
                Thread.Sleep(TimeSpan.FromMinutes(2));
                app.StopProcessing("01");
            }
        }

        public class ProcessorFactory : IRecordProcessorFactory
        {
            public IRecordProcessor CreateNew(string shardId)
            {
                return new RecordProcessor();
            }
        }

        public class RecordProcessor : IRecordProcessor
        {
            public ProcessRecordsResult Process(string shardId, Record[] records)
            {
                foreach (var record in records)
                {
                    Console.WriteLine("Got record: {0}", Encoding.UTF8.GetString(record.Data));
                }

                return new ProcessRecordsResult(Status.Success, true);
            }

            public void OnMaxRetryExceeded(Record[] records, ErrorHandlingMode errorHandlingMode)
            {

            }

            public void Dispose()
            {

            }
        }

And run a memory profiler over it, I find that Record[] is not being collected. I can provide a more comprehensive test case tomorrow, if that will help us track down the issue.

theburningmonk commented 9 years ago

Thanks, I'll take a look, let me know if you have other details you're able to share that'd be helpful

khanage commented 9 years ago

It seems to me that it's not a memory leak, but rather that the stream I'm processing produces messages faster than I can process them. I wrote some projects to reproduce this, but I was unable to produce enough messages to reproduce.

What I can see though, is that in Client.fs, in fetchNextRecords, this just gets called recursively, without any feedback from the consumer.

I think I have two options here: try to have some feedback about messages being processed and wait for that to occur before starting the next getRecords request; or changing the way that I process these messages. Do you have any other suggestions?

theburningmonk commented 9 years ago

That is possible given the way it works right now - the fetch happens eagerly, as soon as the last batch was received, so if you're slow to process the current batch then it's possible for the records to build up whilst they're queued up for the processor.

Taking feedback from the processing loop by taking the nextBatch signal ( https://github.com/theburningmonk/ReactoKinesiX/blob/develop/src/ReactoKinesiX/Client.fs#L651-L653) into account feels like the right thing to do here, so that ideally it'll fetch the next batch whilst you're processing the current batch so by the time you finish the next batch will be available for processing.

This is a good point, thank you for pointing it out.

Out of curiosity, what are you using to send records to Kinesis? Because the way throttling works in Kinesis, you might have hit throttling limits when you were trying to generate load to reproduce this, I wrote another library (https://github.com/theburningmonk/darkseid) for the producer side of things to handle things like auto-splitting shards and having the choice of either sending data to Kinesis eagerly or lazily (background mode has the risk of losing buffered data which in some of our cases is a reasonable trade off for client-facing latency).

On Thu, Feb 5, 2015 at 12:14 AM, Khan Thompson notifications@github.com wrote:

It seems to me that it's not a memory leak, but rather that the stream I'm processing produces messages faster than I can process them. I wrote some projects to reproduce this, but I was unable to produce enough messages to reproduce.

What I can see though, is that in Client.fs, in fetchNextRecords, this just gets called recursively, without any feedback from the consumer.

I think I have two options here: try to have some feedback about messages being processed and wait for that to occur before starting the next getRecords request; or changing the way that I process these messages. Do you have any other suggestions?

— Reply to this email directly or view it on GitHub https://github.com/theburningmonk/ReactoKinesiX/issues/52#issuecomment-72969321 .

tomhall commented 9 years ago

Hi @theburningmonk, the approach you mention above sounds like it could potentially help a situation I'm seeing whereby our windows service (using ReactoKinesiX) is consuming more and more memory over time. This graph is showing the windows service memory usage over a couple of days:

ReactoKinesiX Memory Usage

Here are our shard Get Requests for that period of time:

AWS Kinesis Get Requests

Any ideas/pointers appreciated.

theburningmonk commented 9 years ago

Can you have a look at the CloudWatch metrics under the ReactoKinesiX namespace? Would be interested to see Fetched vs Processed counts to see if there is an imbalance there.

On Tue, Apr 21, 2015 at 6:20 AM, Tom Hall notifications@github.com wrote:

Hi @theburningmonk https://github.com/theburningmonk, the approach you mention above sounds like it could potentially help a situation I'm seeing whereby our windows service (using ReactoKinesiX) is consuming more and more memory over time. This graph is showing the windows service memory usage over a couple of days:

[image: ReactoKinesiX Memory Usage] https://camo.githubusercontent.com/0770ac5ab07f62fb9fe21c1ad8f56dee6434bfac/687474703a2f2f6936322e74696e797069632e636f6d2f62386d7835772e706e67

Here are our shard Get Requests for that period of time:

[image: AWS Kinesis Get Requests] https://camo.githubusercontent.com/3d5bc7e4e095b1266fd3028fe86aa6f6a85ed6e0/687474703a2f2f6936312e74696e797069632e636f6d2f36723468386f2e706e67

Any ideas/pointers appreciated.

— Reply to this email directly or view it on GitHub https://github.com/theburningmonk/ReactoKinesiX/issues/52#issuecomment-94641500 .

tomhall commented 9 years ago

The spikes on the graph are when we started our worker processor - getting through a backlog of kinesis records all at once. None of our records are being successfully processed, we're aware of this. The memory usage is definitely an interesting one. I'm considering enabling server mode for the garbage collector to see if that'd help.

theburningmonk commented 9 years ago

I pushed a new version v0.4.0 which should address this behaviour - it now waits until the last batch has been processed before fetching the next lot. In this version you can also specify the MaxBatchSize via configuration so if you don't want to use the Kinesis default of 10000 you can make them smaller - but keep in mind the rate of processing and the max no. of transactions per second on the stream which is calculated based on the no. of shards.

Please let me know if this addresses the issue you guys are seeing.

theburningmonk commented 9 years ago

Hi, is this issue fixed for everyone? If so I'll close the ticket.