supercharge / promise-pool

Map-like, concurrent promise processing
https://superchargejs.com/docs/promise-pool
MIT License
776 stars 41 forks source link

Retrieve task index? #42

Open FezVrasta opened 2 years ago

FezVrasta commented 2 years ago

I'm running the following code where my functions need to pick a web worker from a list in order to run.

const workersList = [worker1, worker2];
await PromisePoool
    .withConcurrency(workerList.length)
    .for(items)
    .process(async (item) => {
        const worker = workerList[executorIndex];

        await runMyLogic(item, worker);
    });

Right now I can't seem to find any way to know which executor is being used from the pool, may you advise?

marcuspoehls commented 2 years ago

@FezVrasta Hey Federico, the promise pool provides the current item index as the second argument in the process method:

const users = ['Federico', 'Marcus', 'Supercharge']

await PromisePool
  .for(users)
  .process(async (user, index, pool) => {
    // use the `index`
  })

Is it what you’re looking for?

FezVrasta commented 2 years ago

Thanks Marcus, but that's the index on the original users array, I'm looking for the id or index of the "worker" that is processing the data.

So, let's say the concurrency limit it set to 4, I need to know in which of the 4 available "slots" is the logic being executed.

marcuspoehls commented 2 years ago

The package doesn't expose any "worker" information to you. All concurrency "slots" run in parallel. There’s no "worker". Under the hood, this package uses promises to wrap the processing. The "worker" is basically just a promise. Promises are eager and start their execution as soon as you create them. Then the promise pool retrieves the value from the promise which is either resolved or rejected.

FezVrasta commented 2 years ago

I understand, but the whole point of the library is to limit the number of promises being evaluated at the same time, right?

I then need to know, of the allocated "threads", which one is being used by a given process execution.

I'm not sure if I'm being clear, I'm sorry.

marcuspoehls commented 2 years ago

Yep, this package limits the number of promises being handled concurrently.

Can you describe your use case? I’m not sure if I understand what you’re looking for

FezVrasta commented 2 years ago

It's what I explained in the original post, I have a pre-allocated number of web workers and I want each thread to use one of them.

Without the information I'm looking for I need to implement a separate idle/busy state management for my workers list and have each process execution pick one of the available ones. If instead I had this information, each process execution would already know which worker is allocated to it.

FezVrasta commented 2 years ago

I think what you call them in the source code is tasks, I need the task index.

https://github.com/supercharge/promise-pool/blob/main/src/promise-pool-executor.ts#L300-L314

This would become something like this:

  startProcessing (item: T, index: number): void {
-   const task: Promise<void> = this.createTaskFor(item, index)
+   const task: Promise<void> = this.createTaskFor(item, index, this.tasks().length)
      .then(result => {
        this
          .removeActive(task)
          .save(result)
      })
      .catch(async error => {
        return this
          .removeActive(task)
          .handleErrorFor(error, item)
      })

    this.tasks().push(task)
  }

With this.tasks().length I get the index of the task that will be inserted right below.

I'm not sure how concurrency safe is my change, but I hope it gives the idea.

This change would ensure the taskIndex will always be unique across all the running tasks.

marcuspoehls commented 2 years ago

@FezVrasta Hey Federico, I got sidetracked yesterday. Thank you for your patience!

I feel like the promise pool may not be the right tool for you. The promise pool is like Array.map with concurrency. It runs a list of items in parallel through a processing function.

I’m not sure your proposed change is helping in your situation. The return value of this.tasks().length is the number of currently active tasks. Imagine a promise pool running at most 2 tasks concurrently. The pool started and has 2 active tasks. Let’s say the task that started first finishes processing and gets removed from the list of active tasks. The pool now has 1 active task (the second task). Starting a new task will give you a task length of 1. You received the same task length of 1 when starting the second task earlier. Do you know what I mean?

FezVrasta commented 2 years ago

Yes my example doesn't work, but you could return the indexOf the task, or just pre-allocate an array with N positions, one for each task, then set them to null when the task finishes, and allocate the next task to the first available spot.

feel like the promise pool may not be the right tool for you.

This is definitely the right tool for me, I got a version of my logic working with your library, it's just that I need to manually handle the workers allocation, and it's a bit of an annoyance considering that this library already has all the info I need but it doesn't expose it.

FezVrasta commented 2 years ago

To add more context, I found this library that goes one step further: https://www.npmjs.com/package/tiny-promise-pool

Its result exposes the thread that has been allocated to run the promise, but, unfortunately, doesn't expose that information to the promise execution.

marcuspoehls commented 2 years ago

Ok, looks like I’m missing something here 😃

Let me describe what I understand:

If I’m understanding that correctly, you’re missing the information which "task" inside the promise pool finished processing. And knowing which "task" finished is important because you’re mapping them to individual worker threads. And when you know which task finished, you know which worker thread can take the next task. Is that correct?

FezVrasta commented 2 years ago

Yes correct

marcuspoehls commented 2 years ago

Ok 👍 I’ll think this through and how to implement it.