Closed mgaylord-branch closed 4 years ago
Hi,
Sorry for the long answer - looking at the code it's not necessarily obvious what happens there, but my assumption would be that because of the rather big batch (5000 events) the resident size of the chunks is that high. The reason for this is that you create an array of strings that you then join and split again in your map function.
What I would recommend is to use scramjet to parse the csv using the StringStream.CSVParse
method and work on a smaller batch like this:
var counter = 0
var header: string
await StringStream.from(readableStream, { maxParallel: 2 })
.CSVParse({
delimiter: ',',
header: true,
skipEmptyLines: true,
worker: true,
step: function(results) {
console.log("Row:", results.data);
}
})
.batch(5000)
.map(async function(results: Array<any>) {
console.debug(`results loaded: ${results.length}`)
counter = counter + results.length
// upload to API
await uploadEvents(events) // MC - I'm not sure where the "events" come from...
})
.run()
console.log(`counter: ${counter}`)
This should get you, more or less what you're trying to achieve, in much lower memory usage.
Also, just after .batch(5000)
can you try if there's a difference to set .setOptions({ maxParallel: 2 })
. I have a suspicion we're resetting options on this and couple other commands.
@MichalCz Thanks for following up on this. I tried your suggestion above, but it seems like CSVParse
needs to complete before the batching begins which causes the system to run out of memory.
This is why I had to split on newlines (because batching doesn't break on newlines leading to parse errors), batch the lines into 500 and then parse. I reduced the batch down to 500 in my above code and the process seems to stay within 1024mb of memory regardless of file size.
This should continue to function fine for now but may run out of time for files over 1.5gb (Lambda limitations are 15 minutes). I have moved setting options as suggested after the batch call and will let you know if makes any difference to the time.
Hmm... If you're correct then that would be a major bug...
I tried reproducing this but I still see the backpressure working, so I'm not sure what breaks it in your case. Can you help me out and create some kind of a test case that I could debug on? A simple repo and some guide how to run the example would do. I'd then try to fix or figure out what's going on...
Sure, I'll put something together.
I just tried again with your suggested code and it definitely runs out of memory. Even with very small batches of 500 events.
Any update on this @mgaylord-branch?
@MichalCz I am a bit snowed under at the moment with work. Will see if I can get something out in the next week or two.
Hey Michael, I'm just pinging to keep this alive... Any chance you'd get some example posted?
@mgaylord-branch can you check one thing - try to add this code somewhere in the begining of uploadEvents:
await new Promise(res => process.nextTick(res));
This should be a bit easier for the GC to find some space to kick in if we're not dealing with some reference leak here (which I still suspect in the uploadEvents
)... Anyway if that helps I can propose a new method in scramjet for this.
@MichalCz Thanks for this and sorry for the slow reply, I will take a look at this in my downtime this week coming.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
I am trying to use StringStream to parse very large CSV files into JSON objects (over 100k to 500k rows) on AWS Lambda and then upload them to a third party service. The following is the code I am using:
When I run this code on a file with 100k lines it maxes out my Lambda memory of 1024mb. If I remove the uploadEvents call it maxes out at around 300mb of memory.
My question is: is there a way to make this more memory efficient? Basically, I am thinking that there must be a way to pause the stream, upload the batch, then release the memory and upload the next batch.
I've tried various permutations but have had no luck, so any help here would be greatly appreciated.
Thanks in advance...