atnos-org / eff

Eff monad for cats - https://atnos-org.github.io/eff
MIT License
581 stars 78 forks source link

Explain how to set Monix ExecutionModel of a Task containing an Eff-subprogram #93

Closed benhutchison closed 6 years ago

benhutchison commented 7 years ago

Current Eff API allows putting an Eff sub-program into a Monix Task effect: def taskSuspend[R :_task, A](task: =>Task[Eff[R, A]], timeout: Option[FiniteDuration] = None): Eff[R, A]

But I am unclear how to obtain precise control of "forking" - specifically ExecutionModel - for Eff tasks. Motivation is to allow tasks collections to execute concurrently.

Would like to understand & document how to write an Eff program with an Async Boundary that enables concurrent execution of Eff-subprograms.

It not clear that this is an Eff "framework", versus "Eff ecosystem" issue, but Im entering it here to keep track of this open question.

etorreborre commented 7 years ago

My understanding of asyncBoundary is not so much that a task is forked but more that the execution should switch back to the original Scheduler after executing a given forked task which may execute on a specific and different Scheduler. So in your example you still need to create forked tasks somewhere.

benhutchison commented 7 years ago

Monix docs describe it thus, but thats actually one use case of async boundaries. Underneath, Monix Task.fork is implemented same as asyncBoundary.

The general meaning of a boundary is: don't keep going on current thread, go back to whatever Scheduler is configured for the Task, be it the default or an override, and dispatch again.

At the bottom, there is a loop where tasks get run. At the end of each task, it decides whether to keep going on current thread (tail recursively) or to break out back to the scheduler.

benhutchison commented 7 years ago

I built another micro benchmark to explore the issue, that shows Eff behaving identically to plain monix. http://scastie.org/30599

Need to understand what the difference is in the earlier failing case

benhutchison commented 7 years ago

Ok, I think Ive figured out the correct pattern to insert Async Boundaries between Task[Eff[R, A]]s. I'll write a little article for the cookbook when time permits

val effSubprogram: Eff[R, A] = ???
taskSuspend(Task.eval(effSubprogram).asyncBoundary) >> taskSuspend(Task.eval(effSubprogram))
etorreborre commented 7 years ago

I see taskSuspend + putting the subcomputation inside an asyncBoundary does the trick, nice.

benhutchison commented 7 years ago

I spent this sunny Sunday morning lying on the couch with a mission to write some docs. I've reached a blockage, here's where I'm at:

edmundnoble commented 7 years ago

100% agreed. Thanks for your work :)

etorreborre commented 7 years ago

@benhutchison all good with me too. Making a separate submodule for docs depending on all other modules make sense, this is also what I do in specs2.

benhutchison commented 7 years ago

Great, glad we're aligned.

On related note: my understand of concurrency in Monix has come leaps and bounds since my questions about Task.fork a fortnight ago. IMRO (R=Recent) the elegant way to use Monix is to select your "concurrency level" through the Scheduler used at execute time, eg:

//async boundary every 32 Tasks executed
//experiments suggests its the optimum parallelism for my directory scanner on my Mac SSD drive
implicit val s = Scheduler(ExecutionModel.BatchedExecution(32))

Monix default is forking every 1024 tasks.

If you really need to control precisely where the boundaries occur (eg after slow, blocking calls like JDBC),thats possible taskSuspend(Task.eval(effSubprogram).asyncBoundary).

Anyway, all that leads me to consider @deprecate or delete TaskInterpretation.taskFork in the API before it gets adoption, shrink the API surface area, and direct users to one of above approaches...?

etorreborre commented 7 years ago

Maybe we can, in the same PR:

edmundnoble commented 7 years ago

Concurrency level and liveness level are not the same, @benhutchison. It appears that the problem is that you have too many threads. As I've said, multiple threads accessing the same disk should not increase your throughput; the only reason it wouldn't is that your benchmark becomes more CPU-bound the more frequently the scheduler suspends. Pinging @alexandru, if I have any of this wrong.

benhutchison commented 7 years ago

multiple threads accessing the same disk should not increase your throughput

You've mentioned this a few times but im sceptical, for typical modern devices fronted by a filesystem. My laptop, tablet and phones use an SSD which are claimed to well support concurrent access. And in the enterprise andcloud context, most filesystems are backed by lots of concurrent disk heads.

Empirically my testing on https://github.com/benhutchison/GettingWorkDoneWithExtensibleEffects/tree/master/exercise4 found that a monix batch size of 32 was optimal on my ssd, much less than the 1024 default, and IMO implying FS concurency.

alexandru commented 7 years ago

Hi guys,

Just to make it clear, the ExecutionModel is about configuring fairness - the general idea is that keeping the run-loop / current thread occupied forever is bad if you have other tasks that need to get a chance to execute. This becomes really clear on top of Javascript, where if you have long loops, you end up keeping the user interface unresponsive, but is also important on top of the JVM.

If you recall, Scala's Future ends up sending a Runnable in the ExecutionContext on each operation. This is sort of terrible for performance reasons, because it destroys cache locality, it introduces unnecessary context switches, etc. The Scalaz Task however keeps that execution on the current thread for as long as that is possible, potentially keeping the current thread / call stack occupied forever. But in a system that does stuff in parallel, this isn't OK for fairness reasons.

So in Monix when you configure BatchedExecution(128) what happens is that the Task's run-loop will execute 128 flatMap operations and then introduce a fork (i.e. it sends a Runnable in the ExecutionContext). This ends up being a sort of cooperative multithreading, which for example people do on top of Javascript by introducing setTimeout calls in their pipeline.

If you configure SynchronousExecution it ends up behaving like Scalaz's Task, meaning that execution will be kept on the current thread for as long as possible. If you configure AlwaysAsyncExecution it ends up behaving like Scala's Future, meaning that each operation (e.g. map, etc) ends up sending a Runnable in the thread-pool.

fa.asyncBoundary is equivalent with fa.flatMap(a => Task.unit.fork.map(_ => a)). What it does is to make the following transformations to happen on the default scheduler, or on a specified on. So for example ...

Task.eval(readFile())
  .executeOn(io) // will execute readFile on the IO thread-pool
  .asyncBoundary // jumps on the default thread-pool for the rest
  .map(_.toString) // will get executed on the default thread-pool

I've touched on these subjects in my presentation at Scala Days. At this point only the slides are available, hopefully there will be a video: https://speakerdeck.com/alexandru/monix-task-lazy-async-and-awesome-scala-days-chicago-2017

edmundnoble commented 7 years ago

The fact that there is an increase in throughput for your particular case does not mean that there is "more concurrency" with a higher liveness level. All that's happening is more thread pool submissions, and it has nothing to do with using traverseA. Even more telling is the fact you aren't even using Task.fork, just Task.eval, so your IO must be synchronous as written even using traverseA, so I don't see how concurrency could even factor in.

Edit: Use a thread pool for your I/O and fork to it when you call Files.list, and the size of it will be the actual tuning point for your program. You will be able to see yourself whether or not my statement about parallel I/O is correct.

benhutchison commented 7 years ago

@edmundnoble, we seem increasingly to be talking at cross-purposes. I dont think your understanding of what BatchedExecution implies is accurate, as your comments above suggest a perceived need for Task.fork, when monix adds forks by default. Have you review the links into monix codebase Ive posted above [https://github.com/atnos-org/eff/issues/93#issuecomment-294811501] ?

alexandru commented 7 years ago

@edmundnoble @benhutchison here's more details for how that works ...

Introducing an explicit fork operation will reset the internal "counter" based on which automatic forks are introduced. So if you configure BatchedExecution(128) and on iteration 64 you flatMap on a Task.fork(fa), then it will take another ~ 128 iterations to introduce the next automatic fork.

So usually it's not a problem to fork explicitly, it won't do more work than necessary.

alexandru commented 7 years ago

Also, that internal counter gets reset on async boundaries automatically, within reason of course. Counter is kept in a ThreadLocal and in Task.unsafeCreate you also get a handle which you can use to reset that counter manually (to make that work on JS as well).

These are implementation details tough, just mentioning that there was some effort in making this work OK 😏

alexandru commented 7 years ago

@benhutchison one more detail, you don't necessarily need to configure the implicit Scheduler ... if you have a Task-based loop, you can configure just that to run with a custom ExecutionModel that overrides the scheduler-injected one:

task.executeWithModel(BatchedExecution(32))

Not sure if that helps or not.

edmundnoble commented 7 years ago

Monix adds asynchronous boundaries, where you stop executing the Task and send the rest of the Task to a Scheduler to execute. I am talking about forking onto a different scheduler and continuing execution on the current scheduler, because your I/O is synchronous as long as you are using Task.eval, and thus there is no concurrency occurring. Even if the batcher is re-submitting the Task to the global scheduler, there is no concurrency occurring. @alexandru is this on target?

edmundnoble commented 7 years ago

Put another way: without a Task.fork call to a different scheduler than the Task is already running on, or Task.async call, there can be no parallelism or concurrency with Monix Task.

benhutchison commented 7 years ago

@edmundnoble There is concurrency and the batch size matters. Ive finally found time to put together an example showing it: https://github.com/benhutchison/TaskConcurrencyExperiment

edmundnoble commented 7 years ago

I've thought about it, and after looking at your example I've realized that I was wrong about the threadpool-shifting behavior not introducing task concurrency because I assumed the default thread pool had one thread. So every thread shift that happens due to Monix's batching actually does introduce concurrency, assuming your default threadpool is multi-threaded.

However, as you can see, relying on this batching concurrency for your program's concurrent execution is error-prone and fuzzy. The reason is that introducing extra synchronous steps in your program will change when the forking happens; keep in mind as well the incredible overhead of threadpool submissions, which is the reason Task is so much more performant than Future. If you use explicit forking in your program you can do it exactly when it's needed for maximum concurrency and no more. This becomes even more important if for example you are blocking threads, because in that case it's necessary for high throughput for the rest of your app that you keep that blocking on a dedicated, separate thread pool and that cannot be provided by this strategy. Paging @alexandru again, sorry bud, I'm wondering about your opinion again.

alexandru commented 7 years ago

@edmundnoble

So every thread shift that happens due to Monix's batching actually does introduce concurrency, assuming your default threadpool is multi-threaded.

I'm not sure what you mean by concurrency here. In my mind a system has concurrency in it when it has two or more components that process stuff out of order, producing unordered results, thus requiring synchronisation.

I don't see such a thing in this sample.

keep in mind as well the incredible overhead of threadpool submissions, which is the reason Task is so much more performant than Future

Scalaz's Task might have better throughput than Future, but Scala's Future is more fair. When you've got tasks that execute concurrently, fairness is a pretty big issue, in some cases being more important than the throughput.

As I mentioned before (https://github.com/atnos-org/eff/issues/93#issuecomment-298222560), Monix does batched execution by default in order to strike a balance between fairness and throughput and it does a pretty good job at that.

If you use explicit forking in your program you can do it exactly when it's needed for maximum concurrency and no more.

I don't see that as being a virtue. Manual control is fine when you want to start something on a special thread-pool meant for I/O, in order to not block the current thread and people do that because they've heard somewhere that having a special thread-pool for blocking I/O is cool.

But preserving fairness is a different thing entirely and I haven't seen regular users being able to cope with it.

In my mind Scalaz's and FS2's Task implementations are totally not adequate for the purpose of doing concurrency and the only reason for why people don't complain is because most of us are working on web services with a classic request / response model where stuff is embarrassingly parallelizable already due to the underlying framework, which can usually cope with the rare long request that blocks threads for indeterminate amounts of time. Hop over on Node.js though and things go down the drain really quickly.

Also I already mentioned above (https://github.com/atnos-org/eff/issues/93#issuecomment-299167628) that when introducing explicit forking, Monix's Task implementation does not introduce extra work because that counter gets reset. Plus you can configure synchronous execution. So if you're after manual control, then you can get it.

And speaking of Eff and parametric F[_] types, there's not much control you can do when all you can use is an Applicative or a Monad instance 😉

alexandru commented 7 years ago

One more thing:

This becomes even more important if for example you are blocking threads, because in that case it's necessary for high throughput for the rest of your app that you keep that blocking on a dedicated, separate thread pool

Actually the problem with blocking threads is also one of fairness, not throughput. Submitting that thread in the IO thread-pool and then jumping back to the global thread-pool introduces extra context switching. You could say that you want to maximize CPU usage, but that's just a side effect, because users don't care about that - what a user cares about is to not wait 10 seconds for a response from the UI after clicking a button.

... and that cannot be provided by this strategy.

Why do you say that? I'm not sure if I understand.

edmundnoble commented 7 years ago

But of course it can be, why do you say that?

I say that because the asynchronous boundaries inserted by Monix to ensure fairness do not shift to a different thread pool and it wouldn't make sense if they did.

I don't see that as being a virtue. Manual control is fine when you want to start something on a special thread-pool meant for I/O, in order to not block the current thread and people do that because they've heard somewhere that having a special thread-pool for blocking I/O is cool.

I am not talking about scalaz Task here. The fairness provided by Future forking every map is excellent but fairness and throughput need to be balanced. Throughput of the rest of your app is increased by fair access to the global thread pool, we are agreeing here, but Future's eager forking decreases the throughput of the threads that the Future is occupying itself. It's not just cool, it increases the access your app has to threads to do CPU-bound actions, and that's what gets you the response time you mention.

As an aside I agree about the concurrency term but this is the terminology we inherit. Parallelism might be a better term in this case, but blocking threads in parallel? It's just odd to me.

Sorry if this makes little sense, still jetlagged.

alexandru commented 7 years ago

OK, so we are on the same page. I'm also drinking coffee right now 😃

benhutchison commented 7 years ago

We have 3 day long https://twitter.com/hackday_rea starting today, will work on the docs issues mentioned above during it, hopefully close this issue

etorreborre commented 7 years ago

Thanks @benhutchison. Sorry I just saw this notification now because I am away at a conference.