Opteo / google-ads-api

Google Ads API client library for Node.js
https://opteo.com
MIT License
264 stars 88 forks source link

Back-pressure in reportStream #257

Closed leopragi closed 3 years ago

leopragi commented 3 years ago
const stream = customer.reportStream({
  entity: "ad_group_ad",
  attributes: [
    ...
  ],
});

for await (const row of stream) {

}

Let's say for every 5000 row I m trying to do some time consuming task, I assume stream will keep giving rows to your accumulator in lib code. How back-pressure is being handled here? I checked the code and seems like there is no stream.pause or stream.resume.

wcoots commented 3 years ago

Hi @leopragi. To handle asynchronous code while using reportStream you will have to use stream.next() to retrieve individual rows. See here for more information on generators and iteration.

e.g.

let done = false
const stream = customer.reportStream({
    entity: 'ad_group_ad',
    attributes: [ ... ],
})

while (!done) {
    const row = await stream.next()
    if (row.value) {
        await doSomething(row.value)
    }
    done = row.done || false
}
leopragi commented 3 years ago

Ok, but this allows me to handle asynchronous code as well.

const BATCH_SIZE = 40000
let batch = []
for await (const row of stream) {
  batch.push(row)
  if(batch.length >= BATCH_SIZE || row.done) {
    await doSomething(batch);    // sleeps 5 mins
    batch.length = 0
  }
}

Let's say doSomething (I used sleep to emulate that time-taking task) takes 5 minutes for a batch, with both approach, I m seeing that nodejs process memory is spiking to 1.5 GB occasionally from 300 MB.

Is reportStream keep accumulating behind data without a pause which causing memory go up?

To note: I used raw stream method and manually paused and resumed the stream, which also had spike in memory to 1.5 GB which is weird.

Zikoel commented 3 years ago

@leopragi I'm interested in this topic too! My accounts are becoming very big and I want to implement something similar. A question for you: If sufficient for node garbage collector to see your batch.length = 0 to free the memory? Is possible that the spike that you see appears because you think to free memory from old batch but instead someone retain this data ?

wcoots commented 3 years ago

Yes this will be a consequence of pausing reportStream for any amount of time; the remaining results are still being piped in and amassed. Therefore in this situation I would recommend using reportStreamRaw, allowing you to manually handle the data events at your own rate. See here for an example.

leopragi commented 3 years ago

@Zikoel

Regarding Garbage collector: I hoped batch.length = 0 would at-least free-up some memory, in my case it didn't. I don't think its retaining anywhere as in the library code i see item = accumulator.shift(). I am about to check this.

But as @WillCooter said, you can use reportStreamRaw to handle data flow. Something like,

let stream = await customer.reportStreamRaw(...)
stream.on('data', async(chunk) => {
  stream.pause()
  await longTask(chunk)
  stream.resume()
})

@WillCooter what is really weird is this is also spiking the memory.

@Zikoel
Working playground code:

const response = await axios({
  method: 'POST',
  url: 'https://googleads.googleapis.com/v6/customers/<CUSTOMER_ID>/googleAds:searchStream',
  headers: {
    ...
  },
  responseType: 'stream',
  data: {
    query: `SELECT ad_group_ad.ad.id FROM ad_group_ad`
  }
})
let stream = response.data;
stream.on('data', async(chunk) => {
  stream.pause()
  await longTask(chunk)
  stream.resume()
})

I don't see any spikes here! 🤔 It just stays at 103 MB.

Zikoel commented 3 years ago

@leopragi I'm searching a way to reproduce the the spike! Can I ask you how do you measure the memory consumption? Actually for me the only solution that works is stream all data inside a file (as fast as possible and during the operation create an index about interesting data) after that read the file with more calm and process in small chunks (a lot of complexity I know).

leopragi commented 3 years ago

@Zikoel I m just measuring it with Activity Manager. You should be able to reproduce with this

let stream = await customer.reportStreamRaw(...)
stream.on('data', async(chunk) => {
  stream.pause()
  await longTask(chunk)
  stream.resume()
})

I also tried your approach before where I read it and save it as a CSV to local file and process. But my use-case is different where I need to store them to a dataware-house, So I m streaming it to S3 and do COPY command in DB.

Please note that if you pause the pipeline and memory won't spike and stay put.

Ex:

let pass = new stream.PassThrough();
return adsAxiosInstance({
  method: 'POST',
  url: `/googleAds:searchStream`,
  responseType: 'stream',
  data: { query }
}).then(async response => {
  response.data
    .pipe(parser)
    .pipe(transform)
    .pipe(json2csv)
    .pipe(pass)
  return upload(outputPath, pass); //this is a Promise
});
leopragi commented 3 years ago

@WillCooter

I believe the accumulator on library is still holding the records. When i use reportStreamRaw, memory is hitting from 300 MB to 2.5 GB. Where when I use rest API call its staying around 500 MB. Can you please take a look at this?

@Zikoel In case if you are interested, for fetching very large data I had to use REST until this issue is fixed.

async function* _getTableDataRaw(axiosInstance, query) {
  const rawStream = await axiosInstance({
    method: 'POST',
    url: '/googleAds:searchStream',
    responseType: 'stream',
    data: { query }
  })
  const pass = new stream.PassThrough({ objectMode: true })
  const gstream = rawStream.data
    .pipe(JSONStream.parse([/.*/, `results`]))
    .pipe(pass)
  for await (const chunk of gstream) {
    gstream.pause()
    yield chunk
    gstream.resume()
  }
  return []
}