typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
2.03k stars 520 forks source link

IO async + shift is confusing #370

Closed JohnReedLOL closed 6 years ago

JohnReedLOL commented 6 years ago

See question: https://stackoverflow.com/questions/52437675/scala-cats-effects-io-async-shift-how-does-it-work

alexandru commented 6 years ago

Hi @JohnReedLOL,

I replied to your SO question.

In an effort to keep the issue tracker clean, will now close this, but feel free to add extra comments.

JohnReedLOL commented 6 years ago

Okay, so if I can get the same "wait for completion" semantics from contextShift.evalOn(IO.apply{...}), and I can just shift to the thread that my IO.async block would use, then why would I ever use IO.async? Like can't I just replace calls to IO.async with blocking calls to IO.apply?

alexandru commented 6 years ago

@JohnReedLOL blocking calls are problematic so you don't want to do it if you can help it.

Read this: https://monix.io/docs/3x/best-practices/blocking.html

JohnReedLOL commented 6 years ago

I mean can't any async IO just be replaced with a shift followed by a synchronous IO? Like let's say you have an async IO that executes in an Executors.newSingleThreadExecutor(). Can't I just replace that with _ <- IO.shift(Executors.newSingleThreadExecutor()) followed by a regular IO (IO.apply{...}) which wraps the same commands that the async IO wrapped?

alexandru commented 6 years ago

I mean can't any async IO just be replaced with a shift followed by a synchronous IO?

Obviously no.

You're confusing asynchrony with multi-threading. They are not the same. An asynchronous process is something that doesn't execute on the current thread or call stack. It can be something that executes on another node on the network.

shift + IO.delay will yield an asynchronous IO, but that implication only works in one direction, the reverse not being true.

JohnReedLOL commented 6 years ago

Like can't I just replace this:


  def forkAsync(toRun:  => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
        try {
          callback(Right(nothing)) // On success, the callback returns nothing
        } catch {
          case NonFatal(t) => callback(Left(t)) // On failure, it returns an exception
        }
      }
    })
  }

forkAsync(println("Hello World"))(Executors.newSingleThreadExecutor()).unsafeRunSync()

With this:

(IO.shift(ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())).flatMap( _ => IO{println("Hello World")})).unsafeRunSync()

And it would be essentially the same thing?

JohnReedLOL commented 6 years ago

Sorry, I was typing that while you were replying. Anyway, this is pretty confusing. So asynchronicity is like when you wait for the message to arrive that the result is available and do something else while you're waiting.

So let's say I had one task that returns an int, a second task that returns a long, a third task that just prints "Hello World", and a fourth task that sums together the int and the long. And I wanted to do everything on a single thread. Also, I don't want the third task to wait for the first two tasks - let's say they sleep for a second and then return a value. But I do want the fourth task to wait for the first two tasks to complete. And I want to order these tasks as a series of IO's where task1 chains to task2 which chains to task3 which chains to task4.

How would you write that?

JohnReedLOL commented 6 years ago

An asynchronous process is something that doesn't execute on the current thread or call stack

Wait a second. According to: https://stackoverflow.com/questions/34680985/what-is-the-difference-between-asynchronous-programming-and-multithreading

There is such a thing as "Asynchronous, single threaded". How can there be such a thing as "Asynchronous, single threaded" if by your definition "An asynchronous process is something that doesn't execute on the current thread or call stack"?

JohnReedLOL commented 6 years ago

Also, in Cats Effect, the definition of "callback" in async is kind of weird. Normally "callback" is just a function that you pass and this function is executed when a result is available. But your callbacks just look like an object which takes in an Either, which in turn wraps a value. Like your "cb" parameter doesn't appear to actually execute a function. It appears to just wrap a value that a function returns. If a complete beginner like me is totally flabbergasted this is a sign that there is something unclear.

alexandru commented 6 years ago

There is such a thing as "Asynchronous, single threaded". How can there be such a thing as "Asynchronous, single threaded" if by your definition "An asynchronous process is something that doesn't execute on the current thread or call stack"?

Not sure where you see the inconsistency. JavaScript is single threaded and asynchrony in JavaScript is even more relevant than on the JVM, since you cannot block for the result of asynchronous processes, which is why in JS most I/O operations are async. Cats-Effect obviously works on top of JavaScript as well (compiled to JS via Scala.js).

Like your "cb" parameter doesn't appear to actually execute a function. It appears to just wrap a value that a function returns. If a complete beginner like me is totally flabbergasted this is a sign that there is something unclear.

Not sure where the confusion is. cb is a function that you're given to signal the result when ready.

See the definition for a callback: https://en.wikipedia.org/wiki/Callback_(computer_programming)

Avasil commented 6 years ago

So let's say I had one task that returns an int, a second task that returns a long, a third task that just prints "Hello World", and a fourth task that sums together the int and the long. And I wanted to do everything on a single thread. Also, I don't want the third task to wait for the first two tasks - let's say they sleep for a second and then return a value. But I do want the fourth task to wait for the first two tasks to complete. And I want to order these tasks as a series of IO's where task1 chains to task2 which chains to task3 which chains to task4.

How would you write that?

One way of doing it:

val io: IO[(Int, Long)] = 
  for {
    int <- task1
    long <- task2 
  } yield (int, long)

for {
  fiber <- io.start // starts in the background and for goes further
  _ <- task3.start // fire and forget, for comprehension goes further
  (int, long) <- fiber.join // wait for the io result
  result <- task4(int, long)
} yield result

How many threads are used depends on your thread pool.

JohnReedLOL commented 6 years ago

Something about async IO still isn't clear. Here is some example code:

import cats.effect.IO
import scala.util.control.NonFatal
object Program {

  type CallbackType = (Either[Throwable, Unit]) => Unit

  def doHomework(subject: String, toRun: () => Unit): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    println(s"Starting my ${subject} homework." + "__" + Thread.currentThread().getName) // Starting my math homework.__main
    Thread.sleep(Long.MaxValue)
    try {
      val effectResult = toRun()
      callback.apply(Right.apply(effectResult)) // On success, the callback returns nothing
    } catch {
      case NonFatal(t) => callback.apply(Left(t)) // On failure, it returns an exception
    }
  }

  val io: IO[Unit] = doHomework("math", () => {
    println("Finished my homework" + "__" + Thread.currentThread().getName) // Finished my homework__main
  })

  import scala.reflect.runtime.universe
  def getTypeTag[T: universe.TypeTag](obj: T): universe.TypeTag[T] = universe.typeTag[T]

  object Foo

  def main(args: Array[String]): Unit = {

    val theType: universe.Type = getTypeTag(Foo).tpe
    println("Foo: " + Foo.toString)
    println("theType: " + Foo.getClass.getName)
    println("theType: " + theType)

    val mySyncIO = IO{println("Hello World")}
    println("mySyncIO: " + mySyncIO.getClass.getName) // mySyncIO: cats.effect.IO$Delay
    println("mySyncIO: " + getTypeTag(mySyncIO).tpe)
    val myAsyncIO = io
    println("myAsyncIO: " + myAsyncIO.getClass.getName) // myAsyncIO: cats.effect.IO$Async
    println("myAsyncIO: " + getTypeTag(myAsyncIO).tpe)
  }
}

Here is the output of running the above code:

  Foo: example.Program$Foo$@3e792ce3
  theType: example.Program$Foo$
  theType: example.Program.Foo.type
  mySyncIO: cats.effect.IO$Delay
  mySyncIO: cats.effect.IO[Unit]
  myAsyncIO: cats.effect.IO$Async
  myAsyncIO: cats.effect.IO[Unit]

First off, when I use Scala reflection, it appears that both the result of calling IO.async[Unit] and IO.apply return a cats.effect.IO[Unit], but when I use Java reflection the IO.apply[Unit] returns something of type cats.effect.IO$Delay, where Delay is a class defined inside cats.effect.IO$.

Looking into the cats code, I see:

  /** Corresponds to [[IO.apply]]. */
  private[effect] final case class Delay[+A](thunk: () => A)
    extends IO[A]

and

  /**
   * Corresponds to [[IO.async]] and other async definitions
   * (e.g. [[IO.race]], [[IO.shift]], etc)
   *
   * @param k is the "registration" function that starts the
   *        async process — receives an [[internals.IOConnection]]
   *        that keeps cancellation tokens.
   *
   * @param trampolineAfter is `true` if the
   *        [[cats.effect.internals.IORunLoop.RestartCallback]]
   *        should introduce a trampolined async boundary
   *        on calling the callback for transmitting the
   *        signal downstream
   */
  private[effect] final case class Async[+A](
    k: (IOConnection, Either[Throwable, A]  => Unit) => Unit,
    trampolineAfter: Boolean = false)
    extends IO[A]

Okay, so these are the real, concrete classes that are returned to me.

Now if I were in Javascript doing callbacks on a single thread and I wanted something analogous to:

  type CallbackType = (Either[Throwable, Unit]) => Unit

  def doHomework(subject: String, toRun: () => Unit): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    println(s"Starting my ${subject} homework." + "__" + Thread.currentThread().getName) // Starting my math homework.__main
    Thread.sleep(Long.MaxValue)
    try {
      val effectResult = toRun()
      callback.apply(Right.apply(effectResult)) // On success, the callback returns nothing
    } catch {
      case NonFatal(t) => callback.apply(Left(t)) // On failure, it returns an exception
    }
  }

  val io: IO[Unit] = doHomework("math", () => {
    println("Finished my homework" + "__" + Thread.currentThread().getName) // Finished my homework__main
  })

I might try something like this:

function doHomework(subject, callback) {
  alert(`Starting my ${subject} homework.`);
  callback();
}

doHomework('math', function() {
  alert('Finished my homework');
});

Now, let's say I wanted to translate this Javascript code into equivalent Scala code that will run on the JVM and use cats effects. First off, would the above Javascript callback code all run on a single thread? If so, how would I achieve the exact same thing using cats effects on the JVM?

Let's say I tried something like this:

  type CallbackType = (Either[Throwable, Unit]) => Unit

  def doHomework(subject: String, toRun: () => Unit): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    println(s"Starting my ${subject} homework.");
    try {
      callback.apply(Right.apply(toRun())) // On success, the callback returns nothing
    } catch {
      case NonFatal(t) => callback.apply(Left(t)) // On failure, it returns an exception
    }
  }

  doHomework("math", () => {
    println("Finished my homework");
  });

This looks pretty much the same as the Javascript code, right? All we have to do is run it (unsafely). Now we have a couple of options. We can run the result of doHomework with unsafeRunAsync or unsafeRunSync. Now the concrete type of the result of doHomework appears to be case class Async, so we can call unsafeRunAsync or unsafeRunSync on this object of type case class Async. But what if we did this:

  def doHomework2(subject: String, toRun: () => Unit): IO[Unit] = IO.apply[Unit] {
    println(s"Starting my ${subject} homework.");
    toRun()
  }

  doHomework2("math", () => {
    println("Finished my homework");
  });

This looks pretty much the same as the Javascript code, but it is not asynchronous, right? But we still have to run it. We have the same 2 options. We can run the result of doHomework2 with unsafeRunAsync or unsafeRunSync. Now the concrete type of the result of doHomework2 appears to be case class Delay, so we can call unsafeRunAsync or unsafeRunSync on this object of type case class Delay.

Thus, we can call unsafeRunAsync on an instance of case class Async, we can call unsafeRunSync on an instance of case class Async, we can call unsafeRunAsync on an instance of case class Delay, and we can call unsafeRunSync on an instance of case class Delay. This represents four different possibilities. What is the difference between these four possibilities? Is there any difference between them if there is only a single thread, main, and everything is run on that thread?

Which of these possibilities is equivalent to executing the following Javascript code, but on the JVM:

function doHomework(subject, callback) {
  alert(`Starting my ${subject} homework.`);
  callback();
}

doHomework('math', function() {
  alert('Finished my homework');
});

If none of these things is equivalent to the above Javascript code, then can you write an example, without using fibers/join, that is equivalent to the above Javascript code, but runs on the JVM using cats effects?

JohnReedLOL commented 6 years ago

Actually, there is one more possibilities in addition to the four I listed above. What if I tried using runAsync and then running the resulting object of type SyncIO[Unit] with unsafeRunSync? Also, why is it that an instance of case class Delay and an instance of case class Async can be called with the user's choice of either unsafeRunSync or unsafeRunAsync, but the resulting object of type SyncIO[Unit] can only be called with unsafeRunSync?

JohnReedLOL commented 6 years ago

Also, there is an extra complication. You can call runAsync on an instance of case class Delay and you can also call runAsync on an instance of case class Async. And then after you get the resulting object of type SyncIO[Unit] you can call unsafeRunSync on that. So this represents two additional possibilities, not one. Now there are six ways of writing this. Six different combinations.

JohnReedLOL commented 6 years ago

So basically, the additional possibility would be something like this:

import cats.effect.{IO, SyncIO}

object Program {

  def doHomework3(subject: String): IO[Int] = IO.apply[Int] {
    // prints "Starting my math homework.__main"
    println(s"Starting my ${subject} homework." + "__" + Thread.currentThread().getName)
    // returns 33 to callback
    33
  }

  /*
    final def runAsync(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] = SyncIO {
    unsafeRunAsync(cb.andThen(_.unsafeRunAsyncAndForget()))
  }*/

  val syncIO: SyncIO[Unit] = doHomework3("math").runAsync((callback: Either[Throwable, Int]) => {
    // prints "Finished my homework__main"
    println("Finished my homework" + "__" + Thread.currentThread().getName)
    if (callback.isRight) {
      val right: Int = callback.right.get
      IO {
        // prints "33"
        println(right)
      }
    } else {
      // We are just ignoring the exception because the equivalent Javascript code is not handling an exception.
      IO {()}
    }
  })

  import scala.reflect.runtime.universe

  def getTypeTag[T: universe.TypeTag](obj: T): universe.TypeTag[T] = universe.typeTag[T]

  def main(args: Array[String]): Unit = {
    import scala.reflect.runtime.universe._
    println("syncIO toString: " + syncIO.toString) // SyncIO$733672688
    println("syncIO Java class name: " + syncIO.getClass.getName) // cats.effect.SyncIO
    println("syncIO Scala class name: " + typeOf[syncIO.type].typeSymbol.asClass) // class SyncIO
    println("syncIO Scala type: " + getTypeTag(syncIO).tpe) // cats.effect.SyncIO[Unit]
    syncIO.unsafeRunSync() // prints 33
  }
}

Would this code, if run on the JVM, be conceptually analogous to the following JavaScript code:

function doHomework(subject, callback) {
  // prints "Starting my math homework."
  alert(`Starting my ${subject} homework.`);
  // returns 33 to callback
  callback(33);
}

doHomework('math', function(right) {
  // prints "Finished my homework"
  alert('Finished my homework');
  // prints "33"
  alert(right);
});

Wait a second. In Javascript there isn't just one thread, main, and everything runs on that thread. There is at least one additional thread in the background for events/queuing, right?

But in the above Scala code run on the JVM, which I thought would be analogous to the Javascript code, everything is run on the main thread. There is no additional thread in the background for events/queuing.

So how do I translate the Javascript code into conceptually analogous scala cats effect code that is run on the JVM?

JohnReedLOL commented 6 years ago

Also, in the definition of runAsync, in the part that goes "cb.andThen(_.unsafeRunAsyncAndForget())", we are fire-and-forgetting the callback, right? So in the Scala code in the above comment, the part that prints "Finished my homework" and then prints "33", that is the part that is being fired and forgotten, right? The part before that, the part that prints "Starting my math homework", that is not fire-and-forget, right? The function doHomework, we are waiting for it to complete execution and return a value, correct?

Wait a second. callback is not of type Either[Throwable, Int]. Either[Throwable, Int] is just the input that goes into the callback. I think this is where I am confused. The actual type of the callback is:

type callbackType = Either[Throwable, A] => IO[Unit]

This makes much more sense now:

  val syncIO: SyncIO[Unit] = doHomework3("math").runAsync((callbackInput: Either[Throwable, Int]) => {
    // prints "Finished my homework__main"
    println("Finished my homework" + "__" + Thread.currentThread().getName)
    if (callbackInput.isRight) {
      val right: Int = callbackInput.right.get
      IO {
        // prints "33"
        println(right)
      }
    } else {
      IO {()}
    }
  })
JohnReedLOL commented 6 years ago

Okay, I realized why nothing made sense up until this point. If you look at your old documentation, you have code like this:

def send(c: Channel, chunk: Array[Byte]): IO[Unit] = {
  IO async { cb =>
    c.sendBytes(chunk, new Response[Unit] {
      def onError(t: Throwable) = cb(Left(t))
      def onSuccess(v: Unit) = cb(Right(()))
    })
  }
}

Now, if you look at the variable "cb", logically by looking at it a newcomer would assume that "cb" stands for "callback". Thus if you un-abbreviate the variable, you would get this:

def send(c: Channel, chunk: Array[Byte]): IO[Unit] = {
  IO async { callback =>
    c.sendBytes(chunk, new Response[Unit] {
      def onError(t: Throwable) = callback(Left(t))
      def onSuccess(v: Unit) = callback(Right(()))
    })
  }
}

If you look at the method signature:

def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A]

The type of the callback, k is (Either[Throwable, A] => Unit) => Unit.

The type of the callback is NOT (Either[Throwable, A] => Unit).

So if you take the code I just wrote and try to expand it out by defining a type variable:

type TypeOfCallback = (Either[Throwable, A] => Unit)
def send(c: Channel, chunk: Array[Byte]): IO[Unit] = {
  IO async { callback: TypeOfCallback =>
    c.sendBytes(chunk, new Response[Unit] {
      def onError(t: Throwable) = callback(Left(t))
      def onSuccess(v: Unit) = callback(Right(()))
    })
  }
}

Your code will compile, but this is wrong wrong wrong wrong wrong. The type of callback is NOT (Either[Throwable, A] => Unit). That is the type of the input to our callback. Your variable names should be like this:

type CallbackInput = (Either[Throwable, A] => Unit)
type Callback = (Either[Throwable, A] => Unit) => Unit
def send(c: Channel, chunk: Array[Byte]): IO[Unit] = {
  IO async { callbackInput: CallbackInput =>
    c.sendBytes(chunk, new Response[Unit] {
      def onError(t: Throwable) = callbackInput(Left(t))
      def onSuccess(v: Unit) = callbackInput(Right(()))
    })
  }
}

Now it makes sense. Now the type has an appropriate, descriptive name that accurately reflects what it actually is. Now I can write this:


  type CallbackInput = Either[Throwable, Int]) // Type of input to callback
  type Callback = (CallbackInput) => IO[Unit] // Type of callback itself

  val callback: Callback = (callbackInput: CallbackInput) => {
    // prints "Finished my homework__main"
    println("Finished my homework" + "__" + Thread.currentThread().getName)
    if (callbackInput.isRight) {
      val right: Int = callbackInput.right.get
      IO {
        // prints "33"
        println(right)
      }
    } else {
      // We are just ignoring the exception because the equivalent Javascript code is not handling an exception.
      IO {()}
    }
  }

  val syncIO: SyncIO[Unit] = doHomework3("math").runAsync(callback)

Now it makes sense. Your variable naming was wrong.

JohnReedLOL commented 6 years ago

So now we have this:

import cats.effect.{IO, SyncIO}

object Program {

  def doHomework(subject: String): IO[Int] = IO[Int] {
    // prints "Starting my math homework.__main"
    println(s"Starting my ${subject} homework." + "__" + Thread.currentThread().getName)
    33
  }

  type CallbackInput = Either[Throwable, Int]
  type Callback = (Either[Throwable, Int]) => IO[Unit]

  val callback: Callback = (callbackInput: CallbackInput) => {
    // prints "Finished my homework__main"
    println("Finished my homework" + "__" + Thread.currentThread().getName)
    if (callbackInput.isRight) {
      val right: Int = callbackInput.right.get
      IO {
        // prints "33"
        println(right)
      }
    } else {
      IO {()}
    }
  }

  def main(args: Array[String]): Unit = {
    doHomework("math").runAsync(callback).unsafeRunSync()
  }
}

Which is conceptually equivalent to this (we can assume that all of our Javascript code is running on the main thread):

function doHomework(subject, callback) {
  // prints "Starting my math homework."
  alert(`Starting my ${subject} homework.`);
  // returns 33 to callback
  callback(33);
}

const callback = (right) => {
  // prints "Finished my homework"
  alert('Finished my homework');
  // prints "33"
  alert(right);
}

doHomework('math', callback);

SyncIO appears to just be a wrapper for an IO[Unit] with the IO[Unit] in this case being an instance of case class Delay[+A](thunk: () => A) extends IO[A], so I am assuming because homework3 was created with IO.apply.

Now if we assume that all our Javascript code is run on a single thread, now these two things are the same. And if we want our callback to run on a background worker, we can just do this:

import java.util.concurrent.Executors
import cats.effect.IO
import scala.concurrent.ExecutionContext

object Program {

  val backgroundWorker = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())

  def doHomework(subject: String): IO[Int] = IO.apply{
    // prints "Starting my math homework.__main"
    println(s"Starting my ${subject} homework." + "__" + Thread.currentThread().getName)
    33
  }

  type CallbackInput = Either[Throwable, Int]
  type Callback = (Either[Throwable, Int]) => IO[Unit]

  val callback: Callback = (callbackInput: CallbackInput) => IO[Unit]{
    backgroundWorker.execute(new Runnable {
      override def run(): Unit = {
        // prints "Finished my homework__pool-1-thread-1"
        println("Finished my homework" + "__" + Thread.currentThread().getName)
        if (callbackInput.isRight) {
          val right: Int = callbackInput.right.get
            // prints "33"
            println(right)
        }
      }
    })
  }

  def main(args: Array[String]): Unit = {
    doHomework("math").runAsync(callback).unsafeRunSync()
  }
}

In this case the callback is our async action - we do not wait for our callback to complete. We "fire and forget" the callback passed into runAsync. The doing of the homework is run synchronously and the callback is run asynchronously. I think this part makes sense.

So I guess the difference between this:

  def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callbackInput: CallbackInput =>
    // "callbackInput" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
    println("LalalaAsync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
        try {
          callbackInput(Right(nothing)) // On success, the callback returns nothing
        } catch {
          case NonFatal(t) => callbackInput(Left(t)) // On failure, it returns an exception
        }
      }
    })
  }

and this:

  def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
    println("LalalaSync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        toRun()
      }
    })
  }

is that forkSync executes the Runnable fire-and-forget style and forkAsync waits for the Runnable to complete.

How does forkAsync know to wait for the Runnable to complete? Because cats is observing callbackInput. The function callbackInput should only be called once - it is under observation. And when it is called (i.e. the first time it is called), cats retrieves a value of type Either[Throwable, A], where A is the return type of the thunk we passed in and executed asynchronously - it is the type of the resulting value we get when we flatMap over our instance of case class Async, which extends IO.

Now I understand.

RaasAhsan commented 6 years ago
def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A]

The argument to async isn't a callback. Its a function that accepts a callback and returns nothing. It is designed to lift asynchronous code into IO. The argument of type Either[Throwable, A] => Unit is a callback that YOU are responsible for invoking once the asynchronous computation has been completed.

When the IO runtime encounters an async, it begins executing the function you passed it and supplies it a callback that you will invoke once your asynchronous action has completed.

Typically, we are responsible as users for supplying callbacks and the environment will consume the callback and invoke it:

setTimeout(() => {
  console.log('callback has been invoked after 5 seconds');
}, 5000);

But this async function flips the responsibilities: instead the runtime supplies the callback to continue evaluation and we as users are responsible for consuming and invoking it.

SystemFw commented 6 years ago

Your variable naming was wrong.

No, it was not.

 IO async { cb =>
     ... // code that will call the callback `cb`
  }

Now, if you look at the variable "cb", logically by looking at it a newcomer would assume that "cb" stands for "callback".

and that's exactly what it stands for, because that's what it is

If you look at the method signature:def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] The type of the callback, k is (Either[Throwable, A] => Unit) => Unit. The type of the callback is NOT (Either[Throwable, A] => Unit).

k is not the callback, is the continuation (commonly known as k in the literature), which is a function that accepts a callback.

So when you write it down you have input =>, but the input is the callback, and therefore async { cb =>

As @RaasAhsan correctly said, when the IO runtime encounters an Async node, it begins executing the function you passed it and supplies it a callback that you will invoke once your asynchronous action has completed. This function is extremely useful to embed async code in IO, precisely to avoid dealing with callbacks.

On a separate note, your examples don't show much because they all return Unit/F[Unit], which doesn't really help in distinguishing sync and async code. Try having examples which return F[Int] and you'll see why you need async more clearly. Your forkSync is still aynchronous because it relies on thread pool submission. If toRun was () => A you'd need async (or equivalent behaviour like a Promise) to return F[A]

JohnReedLOL commented 6 years ago

Personally, I think that the fact that the user cannot tell if an object as an instance of case class Delay or case class Async is a design flaw. The user of the code should be able to hover their mouse over an IO and see, based on the type of the object, whether it is "fire and forget" (Delay) or "observe the input of our callback" (Async).

The way it is designed, this detail is hidden away from the user, and the only way for them to be able to tell is by going to the place where the IO was constructed. This is a hassle and it makes it impossible for a user to understand what is going on just by looking at say, a diff.

SystemFw commented 6 years ago

Personally, I think that the fact that the user cannot tell if an object as an instance of case class Delay or case class Async is a design flaw.

Sure, however you prefer.

The user of the code should be able to hover their mouse over an IO and see, based on the type of the object, whether it is "fire and forget" (Delay) or "observe the input of our callback" (Async).

Even if you could see that, you still couldn't know for sure whether an F[Unit] is fire-and-forget or not. You can build two Delay nodes, one of which is fire and forget and the other is not. Simply submit the => Unit to a thread pool for fire and forget semantics, or return it directly for the other case.

More generally, and this doesn't depend on cats-effect, given foo: Unit with side effects, there is no way to know whether it's fire-and-forget or not. Since cats-effect's delay and async wrap such code, the same limitations apply.

Conversely, there are great reasons to keep the IO representation private, given that it builds a reasonable and pure algebra on top of messy things (concurrency, resource safety, side-effects) that you don't want the user to tamper with.

JohnReedLOL commented 6 years ago

Just read your comments. So in IO.async:

type ReturnType = Unit
type CallbackInput = Either[Throwable, ReturnType]
type CallbackType = CallbackInput => Unit
type ContinuationType = CallbackType => Unit

Basically if we change ReturnType to Int async would actually be useful because we would be waiting for an int. We would be able to distinguish because we would not be forgetting whatever it is that the code returns.

I guess what is a little unclear still is why

type CallbackType = Either[Throwable, ReturnType] => Unit

and not say Either[Throwable, ReturnType]

Let's say we change it to this:


  type ReturnType = Int
  type CallbackInput = Either[Throwable, ReturnType]
  type CallbackType = CallbackInput => Unit
  type ContinuationType = CallbackType => Unit

  def forkAsync(toRun: () => Int)(executor: ExecutorService): IO[Int] = IO.async[Int] { callback: CallbackType =>
    executor.execute(new Runnable {
      def run(): Unit = {
        val int: Int = toRun()
        try {
          val right: Right[Nothing, InputType] = Right.apply(int)
          callback(right)
        } catch {
          case NonFatal(t) => callback(Left(t))
        }
      }
    })
  }

If you look at the line callback(right), callback is taking in an object of type Right[Nothing, InputType] which extends Either[Nothing, InputType]. callback is taking in an object of type Either[Nothing, InputType]. I would assume that then this callback returns Unit.

JohnReedLOL commented 6 years ago

Oh, it's because our callback executes on right and returns Unit, which is returned by our continuation k, so the continuation k returns Unit.

SystemFw commented 6 years ago

The continuation k returns Unit because Executor.execute returns Unit

SystemFw commented 6 years ago

Also, you should use delay and async to wrap already existing side-effectful code, for example to wrap a non-blocking network call. If you want to build concurrent behaviour with IO, working in terms of start + Fiber is a lot nicer and easier, as shown in the comment above by Avasil.

JohnReedLOL commented 6 years ago

Oh, I see. IO.delay is the same as IO.apply. IO.delay and IO.apply are for wrapping side-effectful code that was already written, like in your case a non-blocking network call. But if I am writing fresh, concurrent code it is better to use start/join, which kind of reminds me of go routines a little bit. Okay, now I think I actually understand how to use cats effects.

I guess for someone who doesn't have a pureFP or math background it might be a little more obvious if variables weren't abbreviated. Like to me this is super obvious:

  type ResultType = Int
  type CallbackInputType = Either[Throwable, ResultType]
  type CallbackType = CallbackInputType => Unit
  type ContinuationType = CallbackType => Unit

  val executor: ExecutorService = Executors.newSingleThreadExecutor()

  val continuation: ContinuationType = (callback: CallbackType) => {
    executor.execute(new Runnable {
      override def run(): Unit = {
        val result: ResultType = {
          println("Hello World");
          7
        }
        try {
          val right: CallbackInputType = Right(result)
          callback(right)
        } catch {
          case NonFatal(t) => callback(Left(t))
        }
      }
    })
  }

  def asyncHelloWorld: IO[ResultType] = IO.async(continuation)

This is not as obvious, at least not the first time you see it:

  val es = Executors.newSingleThreadExecutor()

  val k: (Either[Throwable, Int] => Unit) => Unit = (c: Either[Throwable, Int] => Unit) => {
    es.execute(new Runnable {
      override def run(): Unit = {
        val r: Int = {
          println("Hello World");
          7
        }
        try {
          c(Right(r))
        } catch {
          case NonFatal(t) => c(Left(t))
        }
      }
    })
  }

  def asyncHelloWorld: IO[Int] = IO.async(k)

Like I have never heard of "continuations", so for someone without a pure FP or advanced mathematics background the meaning of the variable "k" is totally not obvious and also if you want to look up the meaning in Google typing in the letter "k" won't tell you what continuations are. I guess maybe if things weren't abbreviated and you named some of your types the code would be more self-documenting, at least to newcomers.

JohnReedLOL commented 6 years ago

But yeah, I think I see. The purpose of something like forkAsync is so that you can run already-written side-effectful code. Like you could just pass your non-blocking network call into forkAsync as a thunk and it can run asynchronously and you can get back the result when it is done. And if it's a non-blocking network call it shouldn't block your entire application. So that's what it's for - you put non-blocking thunks inside fork-async. Okay, now I think I understand how to use cats effects :-)

JohnReedLOL commented 6 years ago

For anyone who isn't a Scala expert, I provided my own explanation of how it works here: https://stackoverflow.com/a/52465791/4993125

SystemFw commented 6 years ago

This is the OP providing his own answer. IO.async works via a Scala language feature called continuations. Basically, with continuations you can capture the code after your shift (but inside your reset) and apply it like so:

That's not true. cats-effect doesn't use scala scala.utils.continuations with reset and continue at all. "continuation" has a very general meaning of "function that captures the rest of the computation", for example in the term "continuation passing style", or in the argument to flatMap (given F[A] where F[_] forms a Monad, A => F[B] is known as the continuation k)

cats-effect just uses normal scala , and not scala.utils.continuations, which encodes what's called delimited continuations as a specific language feature

JohnReedLOL commented 6 years ago

Oh. So it is a continuation, not scala.utils.continuations. This whole pure FP thing is harder to master than I initially thought.

JohnReedLOL commented 6 years ago

@SystemFw One more thing. This is sort of off-topic, but I would like to send a reified program as a message to another machine and have that reified programs be executed on the machine which receives it. Maybe by like serializing an IO Monad or Monix Task or something like that. Is there a way to do that which does not involve making a code change to an existing library or doing anything super complicated?

SystemFw commented 6 years ago

One more thing. This is sort of off-topic, but I would like to send a reified program as a message to another machine and have that reified programs be executed on the machine which receives it. Maybe by like serializing an IO Monad or Monix Task or something like that. Is there a way to do that which does not involve making a code change to an existing library or doing anything super complicated?

No, this is a super complicated thing because IO or Task FlatMap nodes have an A => F[B], and shipping that over the network involves serialising a function, which is hard. A => F[B] is necessary to encode arbitrary control flow, if your "program" is easier (for example is monoidal: just a list of commands where subsequent commands don't have an arbitrary dependency on previous ones) then the problem also becomes easier.

In Haskell, Cloud Haskell and transient solve that problem. The unison language is also born to solve that. In Scala, software that needs to do that like Spark ends up being very complex as well.

neko-kai commented 6 years ago

@SystemFw Uh, no, serializing JVM classes to bytecode is rather trivial – even Serializable does the job, the complexity in Spark is from serialising closure captures on local data.

SystemFw commented 6 years ago

the complexity in Spark is from serialising closure captures on local data.

Sure, and you have a similar problem here. When given a function A => F[B], it might close over things, and now you have to deal with the closure capture. It's not impossible, but it's not easy either.

JohnReedLOL commented 6 years ago

No, this is a super complicated thing

Uhh, that sucks. Okay, maybe instead of sending an entire program, how about just sending a single function that returns Unit? Like according to Stack Overflow I can Serialize a Java lambda like https://stackoverflow.com/questions/22807912/how-to-serialize-a-lambda and instances of the Scala Function class extend Serializable, as do case classes. And according to https://www.lyh.me/lambda-serialization.html#.W6exIpNKjq0 , even if a class isn't serializable, as long as the class exists on both ends I can make it a @transient lazy val and instead of getting sent over the wire that field will just send over null and then it will get constructed at the other end the first time it is used. Plus there is this https://github.com/twitter/chill thing that can apparently Serialize primitives, maps, functions, tuples, etc and even Java classes that don't extend Serializable as long as they have a no-arg constructor.

But yeah, implementing my own closure cleaner is out of the question and I don't think sending an IO or a Monix Task over the wire would be as simple as using https://github.com/twitter/chill and doing meatlockerInstance.copy on receipt. So yeah, assuming that serializing an IO isn't practical I guess I'll just send over a function which returns Unit and only uses case classes.

To be honest, if you were only able to serialize an instance of cats.effect.IO$Delay that only used components that were serializable with https://github.com/twitter/chill (so basically everything except for classes which do not have a no-arg constructor), I would probably use that over just sending over an instance of Function0[Unit].

Also, since I can do the @transient lazy val thing for fields which do not have a no-arg constructor, I would just need to be careful about causing deadlock because lazy vals synchronize on an object. That just leaves local variables whose classes do not have a no-arg constructor and fields which both do not have a no-arg constructor and need to be accessed by multiple threads simultaneously.

I think it wouldn't be too hard to define a Function0[Unit] that meets those requirements. I haven't really tried it on an instance of cats.effect.IO$Delay, but I could imagine that I might run into some issues.

JohnReedLOL commented 6 years ago

Also, I see a problem. Kryo can't serialize inner classes because they have synthetic references to their outer class. For inner classes you end up having to either make them static with no reference to their outer class or you have to use Java serialization. Also, case classes as far as I am aware do not have a default no-arg constructor, so I expect that I would have to either define one or use Java Serialization there as well. I'm not sure what additional issues I would run into if I tried to serialize a cats.effect.IO$Delay that I wouldn't run into if I tried to serialize a Function0[Unit], but if it were equally difficult to do both I might go with the cats.effect.IO$Delay just to integrate with IO code on the receiving end.

JohnReedLOL commented 6 years ago

Okay. So my case class that wraps cats.effect.IO.delay{println("Hello World!!!!!!!!")} is definitely serializable, as is foo in the below example:

  val foo: IO[Unit] = for {
    seven <- IO[Int]{println("This prints something and returns 7"); 7}
    _ <- IO.delay { println("Hello World!!!!!!!! " + seven.toString) }
  } yield()

I am going to go ahead and send these things around as Akka messages. If this is a bad idea, let me know.

LukaJCB commented 6 years ago

Those IO values don't close over any environment so I guess it works. I really would not recommend sending them around as Akka messages though.