jessetane / queue

Asynchronous function queue with adjustable concurrency
MIT License
764 stars 66 forks source link

Queue promise timeouts seem to bloat memory for long running promises #85

Closed hanseltime closed 1 year ago

hanseltime commented 2 years ago

Hey,

So it's come to my attention as I was using the queue library that we don't actually wait for promises to end when a timeout occurs. When I had a really long-running promise (a bug in our code), this led to overflowed memory because the queue let go of its reference to the job that had timed out and it ran in the background while starting the next().

As a feature request, could we do something like track those promises and dequeue them or error them out?

Sleepful commented 2 years ago

@hanseltime by any chance, could you provide sample code with the issue?

hanseltime commented 2 years ago

A contrived example would be this:

Note: I've added the minimum tracking if we even just fed back the promise to the timeout handler, so that I could track how many promises are building up for scheduling

const q = queue({
   timeout: 200,
})

let numLatePending = 0
q.on('timeout', (next, job) => {
  console.error('job timed out:', job.id)

  if (numLatePending > MAX_LATE_PROMISES) {
     // Throw or poll
  }

  // since we're calling next here and the source code (at least that I saw) is just dropping the reference
  // this means we're running the next promise while the async scheduler is still holding the promise of the last

  next()

    // Ideally, I guess, we could pass the promise so that I could monitor it on my own or we make it part of the queue
    numLatePending++
    jobPromise.finally(() => {
        numLatePending--
    } )
})

q.start()

for (let i = 0; i < 10000; i++) {
  q.push(async () =>  {
      await new Promise<void>((res) => {
            setTimeout(() => {
               res()
            }, 30000) // Contrived example that's hanging because we didn't plan the async
      }) 
  })
}
MaksimLavrenyuk commented 1 year ago

@jessetane

When we process one of the jobs, we pull it out of the general queue pool, i.e. we already "remove" it from the queue. index.js:115

There is no way to undo a Promise already created, the proposal with this functionality died on stage 1. https://github.com/tc39/proposal-cancelable-promises

That means that if you added Promise to the queue, its executor will run it anyway:

q.push(async () => {
      await new Promise<void>((res) => {
            setTimeout(res, 30000) // // function will still be called because it got into the engine scheduler.
      }) 
  })

For this reason, there is no way inside the Queue to affect the cancellation of Promise if the timeout expires.

Regarding job tracking, the timeout event already passes the job itself with which there is a problem. You can compare the incoming job with the one you passed in push().

You can stop the job, if the set of jobs exceeds a certain time, by calling q.end() inside the timeout Listener.

Regarding passing the index or another id, I don't see the point, because we're retrieving a job from an array via shift() - you won't be able to compare this id with q.jobs - it won't be there.

Also, I think that payload in the timeout event should not have next - this is an internal work mechanism.

Please complete or correct me if I am wrong.

hanseltime commented 1 year ago

Hey @MaksimLavrenyuk,

I think this request may be outside the scope of your PR. This is not asking for cancellable promises (since those don't exist in the ES spec). This is asking for surfacing the promise on timeout so that the programmer can choose how to handle that promise. Right now, the job doesn't surface that promise. I can't keep a secondary pool of "long timeout promises" and do appropriate monitoring of those promises since the queue system is firing and forgetting the job.

In case of a complete handling of this edge case, I as the developer would need to either throw an error and stop all functioning on the first timeout (negating the need for an next() function on that event), or I would want to keep a "MAX_LONG_PROMISES" safety value in my code and track the number of promises that are outstanding that went past timeout (see the above contrived example). In a large throughput system where a lot of timeouts occur, these untracked promises will exhaust the heap.

jessetane commented 1 year ago

Thanks all for the additional detail, will review as soon as I can

jessetane commented 1 year ago

So is the easy thing here to just set job.promise here so that when timeout fires the relevant promise is accessible?

hanseltime commented 1 year ago

@jessetane - I think that would solve the inability to monitor long promises!