scala / scala-library-next

backwards-binary-compatible Scala standard library additions
Apache License 2.0
69 stars 17 forks source link

Something like Future.sequentialRun/chunkRun #71

Open exoego opened 3 years ago

exoego commented 3 years ago

On parallel processing, I think it is relatively common usecase to limit the number of concurrent tasks running simultaneously.

For example, let's say we have 10,000 files and upload each of them to somewhere 5-by-5 at a time, to avoid service down due to uploading 10,000 files all together.

Scala standard libs currently offers Future.sequence, whose name sounds suitable for this purpose, but unfortunately it is not...

For this usecase, I found myself and colleagues often define utilities like below:

def sequentialRun[T, U](items: IterableOnce[T])
                       (op: T => Future[U])
                       (implicit ec: ExecutionContext): Future[Seq[U]] = {
  items.iterator
    .foldLeft(Future.successful(Vector.empty[U])) { (f, item) =>
      f.flatMap { acc =>
        op(item).map(acc :+ _)
      }
    }
}

def chunkRun[T, U](chunkSize: Int, items: Seq[T])
                  (op: T => Future[U])
                  (implicit ec: ExecutionContext): Future[Seq[U]] = {
  sequentialRun(items.grouped(chunkSize)) { item =>
    Future.traverse(item)(op)
  }.map(_.flatten)
}

// use-site
chunkRun(5, files) { file =>
  ashnchronoslyUploadToSomewhere(file)
}

I thinks this is helpful if Scala library next offers similar feature.

julienrf commented 3 years ago

Another way to achieve this is to use an ExecutionContext with a bounded number of threads. Unfortunately, there is no super simple Scala-ish way to do it currently. Here is what I do to create an ExecutionContext with at most one thread (to get things running sequentially), and five threads:

// Daemonic so that it won't prevent the application from shutting down
val daemonicThreadFactory = new ThreadFactoryBuilder().setDaemon(true).build()
val oneThread = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor(daemonicThreadFactory))
val fiveThreads = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(5, daemonicThreadFactory))

// then…
def asynchronouslyUploadToSomewhere(file: File): Future[Uploaded] = Future {
  ...
}(fiveThreads /* <− THIS IS THE IMPORTANT PART */)

// Then I can just use the good old `traverse`
Future.traverse(files)(asynchronouslyUploadToSomewhere)

So, the responsibility of chunking / grouping the tasks and collecting their results is delegated to the underlying scheduler.

It is a bit less flexible than what you propose because the parallelism level is fixed once and for all in the execution context (ie, there is no chunkSize parameter in my version), and you may not create as many execution contexts as parallelism levels you want.

Another issue that I see with the current API is that it is a bit cumbersome to create the execution context. As a matter of comparison, here is the Monix equivalent:

- // Daemonic so that it won't prevent the application from shutting down
- val daemonicThreadFactory = new ThreadFactoryBuilder().setDaemon(true).build()
- val oneThread = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor(daemonicThreadFactory))
+ val oneThread = Scheduler.fixedPool("sequential", poolSize = 1)
- val fiveThreads = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(5, daemonicThreadFactory))
+ val fiveThreads = Scheduler.fixedPool("five-threads", poolSize = 5)

That being said, I agree that it would be useful to have a variant of Future.traverse that works chunk by chunk, and that takes the size of the chunks as a parameter.

object Future {
  def traverse[A, B](as: Seq[A], parallelismLevel: Int)(f: A => Future[B]): Future[Seq[B]]
}
makingthematrix commented 3 years ago

@julienrf I was just thinking about something like this. In wire-signals we made a subclass of ExecutionContext, DispatchQueue which lets us control the number of parallel operations: https://github.com/wireapp/wire-signals/blob/main/core/src/main/scala/com/wire/signals/DispatchQueue.scala But we don't use it much, except for a special case when we want to limit the "chunk" down to one.

viktorklang commented 3 years ago

Having thought about this for a while, I think instead of creating a proliferation of overloaded methods with paralellism, I think the "correct" fix would be to make it easier to limit the parallelism of ExecutionContexts. Of course this is a bit more difficult than the simplest solution, since we also need to take blocking/BlockContext into account.

Being able to do something to the effect of implicit val newEc = ExecutionContext.limitParallelism(ec, 5)would be nice.

viktorklang commented 3 years ago

I haven't taken BlockContext into any thought yet, but perhaps something like this could work (will have to study all possible interactions):

def limitParallelism(executionContext: ExecutionContext, maxParallelism: Int): ExecutionContext = {
  require(maxParallelism > 0)
  require(executionContext ne null)

  new java.util.concurrent.atomic.AtomicInteger(maxParallelism) with ExecutionContext {
    private[this] final val queue = new java.util.concurrent.ConcurrentLinkedQueue[Runnable]()
    override final def reportFailure(cause: Throwable): Unit = executionContext.reportFailure(cause)
    override final def execute(runnable: Runnable): Unit = {
      queue.add(runnable)
      schedule()
    }

    private[this] final def schedule(): Unit = {
      val permits = get()
      if (permits > 0) {
        if (compareAndSet(permits, permits - 1)) {
          executionContext.execute(new Runnable {
            override final def run(): Unit =
              queue.poll() match {
                case null => incrementAndGet()
                case some =>
                  try some.run()
                  finally executionContext.execute(this)
              }
          })
        } else schedule() // retry
      }
    }
  }
}
alexklibisz commented 3 years ago

For this usecase, I found myself and colleagues often define utilities like below:

I've written this utility a few times. Totally agree it's worth having in the stdlib or a closely-adjacent library that doesn't require a full effect system.

I'll mention another option that hasn't come up so far: introduce an asynchronous semaphore.

If you have a semaphore like this:

final class AsyncSemaphore (permits: Int) {
  def acquire(): Future[Unit] = ???
  def release(): Future[Unit] = ???

  def withPermit[A](f: () => Future[A])(implicit ec: ExecutionContext): Future[A] =
    for {
      _ <- acquire()
      ta <- f().transformWith(Future.successful) // Lift to a Future[Try[A]] to avoid short-circuiting on failure.
      _ <- release()
      a <- Future.fromTry(ta) // Drop back to a Future[A].
    } yield a
}

Then you can use it to constrain parallelism as follows:

val as: Seq[A] = ??? // Lots of elements, need to constrain parallelism
val f: A => Future[B] = ??? // Apply this to each A.
val n = 16 // This many at a time.

val sem = new AsyncSemaphore(n)
val results = Future.sequence(as.map(a => sem.withPermit(() => f(a)))

Here's a Scastie example of one possible implementation of this semaphore: https://scastie.scala-lang.org/G4Of7yLARza1C7e0OGRSyQ