scala / scala-async

An asynchronous programming facility for Scala
Apache License 2.0
1.15k stars 92 forks source link

Allow using async in for loops over collections #32

Open stanch opened 11 years ago

stanch commented 11 years ago

Hi,

I think in case of (sequential) collections this code

for (x ← xs) {
  ... async ...
}

could be rewritten into

val it = xs.iterator
while (it.hasNext) {
  val x = it.next()
  ... async ...
}

thus relaxing the restriction regarding async inside closures. It only saves a few lines, but looks more idiomatic. What do you think?

retronym commented 10 years ago

This sort of change means that async needs to have special knowledge of collections to undo the translation of for-comprehensions.

An alternative is to use a macro inside the async block:

async {
  foreachMacro(xs) { x =>
    await(f(x) 
  }
}

where foreachMacro performs the rewriting you describe. Because that will be expanded before async, we'll just see the while loop.

I guess the problem with adding special cases like this to async is figuring out where to stop. It would also be nice to write: myOption.map(x => await(f(x))).getOrElse(await(default0).

So we need to give this one a bit of thought. Right now, our top priorities are fixing a few outstanding bugs, and adding support for await in try/catch.

/cc @phaller

stanch commented 10 years ago

I see, that makes a lot of sense. What about using typeclasses to distinguish safely traversable/mappable/... entities? One way would be to embed this functionality into async and provide the instances in contrib package, the other — to provide traverse/map macros you suggest along with the instances entirely in contrib. What do you think? Also, if Try is covered, who uses try/catch? :)

lihaoyi commented 10 years ago

+1 for typeclasses for traverse. A big group of higher order functions are of the form f(x: M[A], t: A => B): M[B], including all the collections operations, Option, Future, Try, and many others. Providing an implicit Traverse[M, Future] for each one would allow you to avoid the macro needing special knowledge of collections, allowing the special knowledge to be provided implicitly and the end-user to add extra implicit Traverses for their own specialized use cases.

retronym commented 10 years ago

@pelotom has done a similar job with Scalaz in his "idiom bracket" inspired applicative macro, https://github.com/pelotom/effectful

That needs to be layered on top of scalaz (for Applicative / Traverse typeclasses and instances). Type class based approaches do have an impedence mismatch with subtyping, I pushed things as far as I could in Scalaz 7 to find the best tradeoff, but ultimately the get the most value of of Scalaz, you tend to have to operate in a subset of Scala. It's harder for us to push typeclasses into the standard library.

I do encourage more experimentation in this area. As I mentioned about, you could compose two macros manually:

async { rewriteMapWithTraverse {

}
}

If you like the result, you can make a macro that calls the other two:

myAsync {
}
lihaoyi commented 10 years ago

Having started playing with this library for reals now, here's another useful 90% solution for the "await inside for loop" problem

async {
  for (x <- xs) {
    doStuff(awaitOnce(otherThing))
  }
}

being translated into

async {
  val $fresh = await(otherThing)
  for (x <- xs) {
    doStuff($fresh)
  }
}

It's not a perfect solution, but I feel it'd hit the 90% use case of await inside a HoF pretty well and be far easier to implement than something fancy using traverse.

stanch commented 10 years ago

@lihaoyi I respectfully disagree :) Why would you put doSomethingOnce in a loop anyway?! I have used the while loop numerous times exclusively to await a sequence of futures one by one. An example.

lihaoyi commented 10 years ago

Yeah, it's not a 100% solution, but I feel it'll satisfy some of the need.

FWIW I have just discovered that this works really nicely:

  import scala.async.Async._
  import scalaxy.loops._
  async{
    for(y <- 0 until canvas.height optimized){
      await(Future())
    }
  }

Who'd have thunk it? I'm using it to great effect in some of my code, and it works exactly as you think it would: from for-loop to while-loop to tail-recursive-async-function.

stanch commented 10 years ago

@lihaoyi Interesting! That’s an option for sure.

P.S. Do you mean “who’d have thought it?”, or it’s a pun on a well-known programming concept? :)

lihaoyi commented 10 years ago

I suspect in general, stream-fusion macros would be able to satisfy a pretty large set of use cases. e.g. scalaxy-streams inlines {Array, Range, Option, Seq, List} x {filter, filterNot, withfilter, map, flatMap, zipWithIndex} operations. Once inlined, scala-async should be able to take the big mass of while-loops and if statements and do a reasonable transform to make it work.

Now if only there was a stream-fusion library that was solid enough to depend upon. Between scalablitz, scalaxy-streams, and speed, we have no shortage of experimental implementations!

retronym commented 10 years ago

Most (all?) of those libraries use resetAttrs on the argument trees they splice into the result, which I've taken pains to avoid here in async as it currently guarantees that the macro will fail for some inputs.

lihaoyi commented 10 years ago

Yeah, but I'd consider that an implementation detail. I think the approach is pretty reasonable: it's not quite as "general" as a monadic-traverse based implementation, since it's basically hard-coding a bunch of special cases, but the fact that we can share these hardcoded-special-cases with the people who do it for the performance it makes it less-bad.

Again, none of the libraries I've linked are anywhere near production quality, so we're not there yet =)

Atry commented 10 years ago

I think this feature does not require any changing in scala.async, you just need some wrappers for the collections.

I have implemented such a futureSeq wrapper for Stateless Future (a project like scala.async).

I hope this approach would be helpful.

fgoepel commented 7 years ago

Scala-gopher apparently uses async with some additional macros to allow it to work inside for loops:

Go implemented on top of SIP-22 async and share the same limitations. In addition to async/await transfoirm go provide lifting up asynchronous expressions inside some well-known hight-order functions (i.e. it is possible to use async operations inside for loops). Details are available in the tech report: https://arxiv.org/abs/1611.00602

Maybe this could be generalized and included in scala-async itself?

kghost commented 5 years ago

As everybody may know, following code:

async {
  for {i <- xs} yield {
    await i
  }
}

is translated by compiler to:

async {
  xs.map { i =>
    await i
  }
}

So in order to support async inside for-loop, async version of map, flatMap, forEach must be provided by the origin container. After then, we can translate them to:

async {
  await xs.mapAsync { i =>
    async {
      await i
    }
  }
}

where signature of mapAsync is

class xs.type[T] {
  def map[R](f: T => R): this.type[R]
  def mapAsync(f: T => Future[R]): Future[this.type[R]]
}

To achieve this, we may use implicit conversion:

trait AsyncIterable[T] {
  def map(f: T => Future[R]): Future[this.type[R]]
  def flatMap(f: T => Future[Iterable[R]]): Future[this.type[R]]
  def forEach(f: T => Future[R]): Unit
}

then provide implicit conversion helper to convert them to AsyncIterable

object AsyncConverters {
  implicit def iterableToAsyncIterable(xs: Iterable[T]): AsyncIterable[T] = ...
}

When ever we see await insides a nested function scope, we try to find the async variant of that function, and try to use it.