timdp / es6-promise-pool

Runs Promises in a pool that limits their concurrency.
MIT License
379 stars 19 forks source link

Make the PromisePool reusable for multiple batch of tasks #59

Open quantang opened 6 years ago

quantang commented 6 years ago

Hi Tim,

I am not sure whether we could make the PromisePool object reusable, which means I can call the start() multiple times, even if the pool has finished the tasks.

My scenario is something as follows: The tasks (promises) are gradually generated and stored in a list. I want to start the tasks once the first batch of tasks is ready, so the start() will be called before all the tasks are generated. If I got some new tasks later, I can add them to the list (or pool) and the PromisePool can just keep working. The concurrency number should always be limited there.

The current problem is, if the task generator reached the end of the task list, the PromisePool will permanently stop. Even if I add new tasks and call the start() again. It is stopped by the _done variable. If we can reset the _done variable to "false" when the start() is called, the PromisePool should be able to keep working on the new tasks.

I cannot create a new PromisePool for the new tasks either, as the previous pool may still be working. If a new pool is created, the number of concurrently running tasks may exceed the limitation, as each pool can reach the max.

karlhorky commented 6 years ago

I cannot create a new PromisePool for the new tasks either, as the previous pool may still be working. If a new pool is created, the number of concurrently running tasks may exceed the limitation, as each pool can reach the max.

What you can do is set a variable to keep track of whether there is a started pool or not:

let poolStarted
const pendingItems = getInitialPendingItems()
const inFlightItems = []

function promiseProducer () {
  const item = pendingItems.shift()

  if (!item) {
    poolStarted = false
    return null
  }

  inFlightItems.push(item)

  (async () {
    let data

    try {
      data = await fetchItem(item)
    } catch (err) {
      // Re-queue item
      pendingItems.push(item)
    }

    inFlightItems.splice(inFlightItems.indexOf(item), 1)

    // This could also be elsewhere, in a setInterval or something
    if (pendingItems.length > 0 && inFlightItems.length < 1 && !poolStarted) {
      startPool()
    }
  })()
}

function startPool () {
  poolStarted = true
  const pool = new PromisePool(promiseProducer, 3)
  pool.start()
}

startPool()

// Later...

pendingItems.push({id: 5})
quantang commented 6 years ago

Hi @karlhorky , thanks for your suggestion and the code example.

Let's consider the following scenario:

  1. We would like to use the pool to limit the number of concurrent running task up to 10 (in global).
  2. We retrieve the tasks from an external source every 5 seconds and each task needs 10 seconds to finish.
  3. We got the 8 tasks at time 0s, kept them in the "pendingItems" and started the pool. As all the tasks started running (8 < 10), the "pendingItems" would be cleared up and the "poolStarted" would turn to be "false".
  4. We got another 8 tasks at time 5s, pushed them to the "pendingItems". What we should do next?

Your code will call the startPool() as the "poolStarted" has already been changed to "false" in step 3. In other words, a new pool will be created and run all the new tasks. Therefore, we will have 16 tasks concurrently running at this moment (8 tasks are still running in the first pool and the 8 new tasks will run in another pool), which exceeds our expectation (run up to 10 tasks concurrently).

What I suggested is to re-use the first pool and make it call promiseProducer() again, so it can get another 2 tasks from the "pendingItems" and make the concurrent running tasks to 10.

I hope my explanation is clear. Please feel free to tell me if you are confused.

Cheers.

karlhorky commented 6 years ago

Right, sorry, I didn't address this requirement of limiting the concurrency. I've added inFlightItems to keep track of items still being processed.

quantang commented 6 years ago

Hi @karlhorky, please correct me if I am wrong.

My understanding is the PromisePool will stop calling promiseProducer() once the promiseProducer() returns null. So your new added inFlightItems prevents a new pool to be created, however, the current running pool won't fetch a new item from the pendingItems, once the initial 8 tasks have been started (as the 9th call of promiseProducer returned null).

You can try your code here: https://repl.it/@tsw_tq/reusePromisePool

I also included my proposal at the end of the code, you can change the runYourCode to false and run it again. You will see the differences.

As I mentioned, my suggestion is to make the PromisePool re-activable and start to call promiseProducer again, which needs to reset the pool._done to false when the pool.start() is called.

karlhorky commented 6 years ago

Yep, it was more of a suggestion of an idea than 100% running code :blush:

Like I mentioned in the comment, the code that restarts the promise pool could be elsewhere like in a setInterval, as in this code here: https://repl.it/@karlhorky/reusePromisePool


This solution aside, I think that your suggestion is a good one and I support having it in the library itself, as in vilic/promise-pool

timdp commented 6 years ago

Hey guys,

I really appreciate all the feedback here and I apologize for not responding to it sooner. Busy times, I'm sure you know how it goes.

So anyway. As I understand it, this issue originally tried to deal with lists of promises of indefinite size. That is,

  1. When you start the pool, you don't know how many promises there are going to be.

  2. Once all the currently generated promises have been fulfilled, you don't know whether more promises are going to be generated later on.

At this point, definitely correct me if I'm wrong. I'm going to try to offer a solution to both these challenges.

The first one is inherently solved by the current API, as it doesn't require you to pass an array of promises the way Promise.all() does, but instead uses a producer function or iterator, both of which represent an Iterator pattern. So as long as you do have work left to do, you can just keep creating promises.

The second challenge is a bit trickier, and I believe that's where the "reuse" comes in. It arises when there's a period of idle time between runs. It could be solved by indeed allowing start() to be called multiple times, or by adding a resume().

I'm hesitant to add this feature to the pool itself for various reasons:

So what if instead of reusing the pool, you could keep it alive? That is, what if you could prevent it from entering the "done" state in that period where you don't have any new tasks yet? Here's a little demo that accomplishes that:

const PromisePool = require('es6-promise-pool')
const defer = require('p-defer')
const delay = require('delay')

const concurrency = 3
const jobDuration = 1000
const initialJobCount = 5
const finalJobCount = 5

const startTime = Date.now()
const log = msg => console.log(`[+${Date.now() - startTime}] ${msg}`)

let pool
let pendingJobCount = initialJobCount

const lockDeferred = defer()
const lock = {
  active: true,
  promise: lockDeferred.promise,
  release: () => {
    lock.active = false
    lockDeferred.resolve()
  }
}

setTimeout(() => {
  pendingJobCount += finalJobCount
  log('unlock')
  lock.release()
  pool.concurrency(concurrency)
}, jobDuration * 3)

const producer = () => {
  if (pendingJobCount > 0) {
    log('next')
    --pendingJobCount
    return delay(jobDuration).then(() => log('resolve'))
  } else if (lock.active) {
    log('lock')
    pool.concurrency(1)
    return lock.promise
  } else {
    log('flush')
    return null
  }
}

pool = new PromisePool(producer, concurrency)

pool.start()
  .then(() => {
    log('done')
  })

Essentially, once the last known promise (for the time being) has been produced, it returns a placeholder promise (using p-defer), which acts as a semaphore. That one won't resolve until all the promises that will ever exist have been created--i.e., until all the work that will ever come up has been scheduled into the pool. That last part is simulated using setTimeout(): after a few seconds, we add work once more (just as a demo; it could also happen earlier). Finally, we release the placeholder promise so that the pool can pick up the new work. By temporarily reducing the concurrency to 1 while the pool is idle, we prevent adding the placeholder to the pool multiple times.

I feel like this concept is pretty extensible. You could even wrap it in a library if you want.

Alternatively, I do also think there's a way you could create a new pool every time. You would just have to keep track of pending promises yourself as well. Alternatively, maybe there's a way you can create a pool of pools; I'd have to think about that one some more.

Does that help at all?