Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.07k stars 1.85k forks source link

IO thread pool for blocking calls #79

Closed elizarov closed 6 years ago

elizarov commented 7 years ago

We need some kind of IO dispatcher in kotlinx.coroutines that will be optimized for offloading of blocking calls and will be backed by an unbounded thread-pool that is smart enough to create new threads only when needed and to shut them down on timeout. The goal is that you could always wrap blocking IO call in withContext(IO) { ... } (UPDATED: it was formerly named run(IO)) and be sure that you will not run into the problem of having not enough threads in your pool. However, it will be up to the user to control the maximal number of concurrent operations via some other means.

It will be somewhat conceptually similar to newThread() scheduler in Rx, but it will be also designed to be used where io() scheduler in Rx is used.

fvasco commented 7 years ago

Java 6 has Executors.newCachedThreadPool, it is easy to create and easy to customize:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

We can tune maximumPoolSize at startup to limit "the maximum allowed number of threads" throught a System property.

However we should consider that specific use case should create a fine tuned IO executor without using the global one.

elizarov commented 7 years ago

We need to make sure that there is no confusion in difference between run(IO) { /* blocking code here */ } and runBlocking { ... }.

fvasco commented 7 years ago

Kotlin can provide an explicit annotation (like @DslMarker) to avoid invocation from coroutine.

So we can rename the function to

@Blocking
fun runAndWait( ... )
fvasco commented 7 years ago

Using the code:

val job = ...
run(IO + job) { ... }
job.cancel()

I/O thread should be interrupted?

elizarov commented 7 years ago

We should not do interruption by default. It is dangerous. But we definitely need some easy way to adapt an interruptible blocking API (when we are sure it plays well with interrupts) to coroutines. I don't know yet what is going to be the best solution for this. This is also related to #57

elizarov commented 7 years ago

@fvasco W.r.t to @Blocking annotation it is actually a larger issue that spans both Kotlin and Java. Here is a related issue in YT: https://youtrack.jetbrains.com/issue/KT-15525

voddan commented 7 years ago

IMHO the name is misleading. When I see run(IO){} I think it is a coroutine context that guaranties that it runs on the main thread, even if started somewhere else. Or is it an impossible case?

elizarov commented 7 years ago

Hh... the assumption was that the code inside run(IO) { ... } runs inside special IO threads that are designed for blocking IO. The naming problem is how we highlight that it is designed for blocking IO only and that you should not go there if you have async IO....

LouisCAD commented 7 years ago

@elizarov Why not use blockingIO then, instead of just IO?

mykola-dev commented 7 years ago

because it's not blocking? and rxjava already has Schedulers.io() which does the same thing

oshai commented 7 years ago

I would call it IOPool (similar to common pool) with the ability to give it number of concurrent operations as parameter.

voddan commented 7 years ago

BTW, why do we need to call it IO? Isn't it suitable for any blocking operations, IO or not? I feel that the main property of this dispatcher is that it can vary the number of its threads. Maybe something like BlockingHeap would be more fit?

fvasco commented 7 years ago

A bit unrelated question.

How a coroutine integrates with locks? Using IO for a blocking call leads to:

run(IO) { blockingQueue.take() }

this might looks reasonable, but putting this line in actors, in thousand of actors can easily drain the pool or, on other hand, it can create thousand of threads.

Currently I can't figure how integrate coroutine to synchronized block, locks, etc... IO requests are a particular case of locking code: limit the request number can be a benefit and this one doesn't produce a deadlock, never.

Without a golden rule to execute any kind of blocking code I consider really helpful highlight the IO nature of this dispatcher.

elizarov commented 7 years ago

On the naming: we don't have to have IO in the name (it is indeed for blocking ops), but we want it to be discoverable. The problem with "arbitrary blocking" is that in a large app you might want to have different pools for different kinds of blocking ops, but we need to provide some "least common denominator" out of the box, and the problem of blocking IO is what people are facing all the time, so IOPool might be a good name. I'm also thinking on "blocking" int the name, but as I've already alluded earlier run(BlockingPool) { ... } is too confusing w.r.t runBlocking { ... }. Might not be an issue if we rename run, though.

elizarov commented 7 years ago

@fvasco Good point. You could use "IO" pool for integration with blocking queues etc. I mean, if you have to integrate with somebody else's BlockingQueue then you have no choice but to schedule the wait into some pool that is designed with blocking operations in mind.

elizarov commented 7 years ago

Here is a fresh idea that was pointed to by Raul Raja at public Slack. It suffices to have a single CPU-optimized dispatcher is we also provide Scala-like blocking { ... } wrapper function that tags the block of code as doing "blocking" operation for the scheduler. Now, the scheduler will know how many of its treads are blocked and can create new thread as needed to continue to be able to pump CPU-consuming stuff. This seems to me a better programming model for doing blocking IO compared to the dedicated scheduler/dispatcher.

fvasco commented 7 years ago

Please correct me if i am wrong, the Raja's proposal is to create a unbonded thread executor for blocking operations? In such case the coroutines remain cheap but blocking code in coroutines may become a fork bomb.

elizarov commented 7 years ago

Let me clarify. Here is a problem we have: We need to be able to do both CPU-bound tasks and IO/blocking tasks with coroutines. The original proposal (this github issue) was to achieve this via a dedicated IO-tuned dispatcher, so you'd separate these two use-cases by choosing an appropriate dispatcher:

launch(CommonPool) { ... cpu-bound code ... }
launch(IO) { ... blocking IO code ... }

The alternative idea is to borrow Scala's approach: instead of two separate dispatchers let's have a single dispatcher (let's call it DefaultDispatcher) and use it like this:

launch(DefaultDispatcher) { ... cpu-bound code ... }
launch(DefaultDispatcher) { blocking { ... blocking IO code ... } }

What I particularly like about it, is that with this approach is makes sense to truly make this DefaultDispatcher a global default for all coroutine builders and just write:

launch { ... cpu-bound code ... }
launch { blocking { ... blocking IO code ... } }
oshai commented 7 years ago

it looks like a good idea to me. But when I use IO, I usually use the thread pools to limit the number of concurrent io operations I do in parallel. is there a way to put such a limit with that suggestion?

On Mon, Sep 18, 2017 at 11:24 AM, Roman Elizarov notifications@github.com wrote:

Let me clarify. Here is a problem we have: We need to be able to do both CPU-bound tasks and IO/blocking tasks with coroutines. The original proposal (this github issue) was to achieve this via a dedicated IO-tuned dispatcher, so you'd segment these two use-cases by choosing an appropriate dispatcher:

launch(CommonPool) { ... cpu-bound code ... } launch(IO) { ... blocking IO code ... }

The alternative idea is to borrow Scala's approach. Instead of two separate dispatcher let's have a single dispatcher (let's call it DefaultDispatcher) and use it like this:

launch(DefaultDispatcher) { ... cpu-bound code ... } launch(DefaultDispatcher) { blocking { ... blocking IO code ... } }

What I particularly like about it, is that with this approach is makes sense to truly make this DefaultDispatcher a global default for all coroutine builders and just write:

launch { ... cpu-bound code ... } launch { blocking { ... blocking IO code ... } }

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlinx.coroutines/issues/79#issuecomment-330155970, or mute the thread https://github.com/notifications/unsubscribe-auth/ACj0QanQhfoatPqQGbKyDnkVWhL4iJPbks5sjijHgaJpZM4OODA2 .

fvasco commented 7 years ago

I agree but it does not look so fresh, it looks like the PR #83

elizarov commented 7 years ago

@fvasco It finally clicked into my mind when I understood how it shall interface with a scheduler. The difference from PR #83 is that it should not require a switch of context at all if the scheduler that currently runs coroutine natively supports blocking operations itself and counts the number of ongoing blocking operations to make decisions on creating new threads.

elizarov commented 7 years ago

@oshai You can always create thread-number limited context for your blocking ops. It is very good choice for people striving for best control. For example, if you are going JDBC and you want to limit your code to at most n concurrent DB operations, then you'll define val dbContext = newFixedThreadPoolContext(n, "DB") and use it for all you blocking DB operations, wrapping them into run(dbContext) { .... }.

The idea behind blocking { ... } is that is going to be a convenient for cases where you don't really care to put a limit, which is the case where the number of your concurrent blocking operations is limited though some other means, for example, by simply having a limited number of running coroutines that do blocking operations.

elizarov commented 7 years ago

@fvasco The other difference is that this blocking { ... } does not have to be suspend. It can be defined with the following signature:

inline fun <T> blocking(block: () -> T): T

This way, you will be able to use it anywhere in your blocking code, regardless of whether you are going to run it from coroutine or not, and it will have the effect of signaling to the IO-aware coroutine scheduler when it is being use from coroutines.

However, the other implementation option is to declare it as suspend and then make its behavior depended on what kind of coroutine it is invoked from. It can simply serve as signal when it is invoked from an IO-aware scheduler or do switch (like run does) into a default IO-aware scheduler otherwise (if you use it from a UI-bound coroutine, for example).

fvasco commented 7 years ago

Hi @elizarov, build an unbounded thread pool is trivial in Java, also check the thread ownership.

Both changes are tiny patch over the #83, maybe your proposal is more complex, so I am patient for a code review.

voddan commented 7 years ago

Did I get it correctly that if I do boo(); blocking { foo() } inside a coroutine, then foo() may run well after boo() and in another thread, depending on the scheduler?

elizarov commented 7 years ago

That is what I'm thinking about.... For example, if you do boo(); blocking { foo() } in the "IO-aware" thread it can run in without switching threads (which is cool for performance reasons), however, if you do the same code in UI, then blocking { foo() } can suspend, switch to background thread, then resume back in UI thread.

maxd commented 6 years ago

I want to clarify how the cancel will work in such case:

val job = launch { 
    blocking { 
        // this blocking IO code is hung or executing too long without ability to handle `isActive`
    } 
}

...

job.cancel()

Can the job.cancel() interrupt/abort execution of blocking code?

elizarov commented 6 years ago

@maxd Unfortunately, most of the time blocking code on JVM cannot be readily interrupted/aborted (see, for example, all the blocking I/O apis on JVM, JDBC drivers, etc), so there is not way to provide such a facility out-of-the box. However, for those rare cases when there is some way to abort running blocking code, you can always write the corresponding aborting logic.

For example, if you are doing a blocking operation from a network socket there is a documented (guaranteed to work) way to abort it by closing the underlying socket, so one can write:

withContext(IO) {
    val job = coroutineContext[Job]!! // retrieve the current job from the coroutine context
    job.invokeOnCompletion {
        if (job.isCancelled) socket.close() // abort blocking operations on cancellation by closing socket    
    }
   socket.performSomeBlockingOperationHere()
}

We might provide an easier to use API for that, like coroutineContext.invokeOnCancellation { ... } extension function.

However, note that it is not a general solution in any way. For example, you cannot generally abort an going blocking JDBC operation in such a way, as trying to close JDBC connection while the operation is in progress usually blocks until the operation is complete.

maxd commented 6 years ago

@elizarov I am agree with this opinion what sometimes some things doesn't provide ability to cancel long running tasks. However, in some cases, it is still need to cancel them forcibly.

For example I have a GUI application. The main priority here is user experience. The relability on the second place because, to be truth, even if I will call Thread#interrupt for long running jobs which just read a data, in most cases, all will be works fine. So, if I want to check connection and application show a progress dialog with cancel button I should have ability to abort this checking operation even it is hung (i.e. external library set default connection timeout to 60 seconds and I can't impact on that) because I loose nothing but it improve user experience (user realize that the connection is hung after 5 seconds and he don't want to wait yet 55 seconds).

I think will be great if Kotlin coroutines can helps to handle such rare cases but looks like the the blocking approach described in previous comments doesn't help to solve this problem. I mean that the Kotlin coroutines will not provide ability to interrupr/abort blocking calls. Right?

So, if it is true I see the following possible ways how to handle it by himself:

1) don't use Kotlin coroutines for such cases at all 2) wrap the long running task to the separate thread/feature (like in example above with socket) if you understand all consequences of it:

val result = async {
    val executor = Executors.newSingleThreadExecutor()
    val feature = executor.submit(Callable<Int> {
        longOperation()
    })

    val job = this.coroutineContext[Job]!!
    job.invokeOnCompletion(true) { // DEPRECATED: may be replaced to `invokeOnCancellation { ... }` in the future
        if(job.isCancelled) {
            feature.cancel(true)
        }
    }

    feature.get()
}

Am I right?

elizarov commented 6 years ago

@maxd Unfortunately, there is no "one fits everybody" solution to cancelling blocking operations. Different blocking libraries support different approaches to cancellation or none at all. Some of them support interrupted flag, others crash and/or hang when their thread is interrupted. For those cases that do support interrupt we have the following issue open: https://github.com/Kotlin/kotlinx.coroutines/issues/57

I would not recommend to use the code you've provided in production code for two reasons:

As a work-around for your particular case I'd recommend doing this.

First, implement awaitInterruptible extension for CompletableFuture in a straightforward way:

suspend fun <T> CompletableFuture<T>.awaitInterruptible(): T =
    suspendCancellableCoroutine { cont ->
        whenComplete { value, exception ->
            when {
                exception != null -> cont.resumeWithException(exception)
                else -> cont.resume(value)
            }
        }
        cont.invokeOnCompletion { 
            cancel(true) // interrupt running!
        }
    }

I've submitted a related feature request: #259

Then define one shared thread-pool for all your blocking calls:

val threadPool = Executors.newCachedThreadPool() // this seems to be the best type of pool to use

Finally, you can define the following helper function to make conversion of your interruptible blocking calls to suspending cancellable functions straightforward:

suspend inline fun <T> blockingInterruptible(crossinline block: () -> T) =
    CompletableFuture.supplyAsync(Supplier { block() }, threadPool).awaitInterruptible()
maxd commented 6 years ago

First of all, I want to say thank you for these examples. They are very useful for me.

The workaround described above has a problem. CompletableFuture#cancel(true) doesn't interrupt execution of long running operation. I have found on the Internet what the one way to solve it is implement own "CompletableFuture" with ability to interrupt long running operation. So, I think I will use your workaround and custom "CompletableFuture" to solve my problem with cancellation of long running operations running from Kotlin coroutines.

Thank you.

mkotlikov commented 6 years ago

This was my solution, that way you can call it with async(CachedThreadPoolContext()){}. Thoughts?

fun CachedThreadPoolContext() = CachedThreadPoolDispatcher()
class CachedThreadPoolDispatcher : ExecutorCoroutineDispatcherBase() {
    companion object {
        private val executor = Executors.newCachedThreadPool()!!
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
}

This was my original solution using CompletableFuture:

suspend fun <T> asyncResult(execute: () -> T): T = Async.asyncResult(execute)
object Async {
    private val ASYNC_RESULT_THREAD_POOL =  Executors.newCachedThreadPool()

    suspend fun <T> asyncResult(execute: () -> T): T {
        data class AsyncResponse<T>(val result: T?, val exception: Exception?)

        val responseChannel = Channel<AsyncResponse<T>>()
        val asyncResponse: AsyncResponse<T>?

        CompletableFuture.runAsync({})

        CompletableFuture.runAsync (
                Runnable {
                    try {
                        val executionResult = execute()
                        launch(NoopContinuation.context) {
                            responseChannel.send(AsyncResponse(
                                    result = executionResult,
                                    exception = null
                            ))
                        }
                    } catch (exception: Exception) {
                        launch(NoopContinuation.context) {
                            responseChannel.send(AsyncResponse(
                                    result = null,
                                    exception = exception
                            ))
                        }
                    }
                },
                ASYNC_RESULT_THREAD_POOL
        )

        asyncResponse = responseChannel.receive()

        if (asyncResponse.exception != null) {
            throw asyncResponse.exception
        }

        return asyncResponse.result!!
    }
}
elizarov commented 6 years ago

UPDATE on the current design thinking around this issue:

Open questions:

LouisCAD commented 6 years ago

BlockingOps as an alternative name to IO comes to my mind. It's 2 words, like CommonPool.

GeoffreyMetais commented 6 years ago

What about Extended or Expandable? As its an expanding context. this name enhances the fact that this context is not fixed and more costly than CommonPool.

dave08 commented 6 years ago

Or ElasticPool, ExtendablePool? And withBlocking { }?

LassoMike commented 6 years ago

I like CachedThreadPoolContext because it sounds like the existing newSingleThreadContext and newFixedThreadPoolContext. Or CachedThreadPool because it sounds like CommonPool.

fvasco commented 6 years ago

Using blocking { ... } is questionable, as it is quite similar to runBlocking { ... } which would ensure lots of confusion and misunderstanding

It is also possible to solve this debate renaming the runBlocking function. As counterpart of async { } function we can rename runBlocking { } to sync { }, for example.

How do we name the maker block

I consider usefull a dedicated builder, expecialling to indicate the synchoronous/blocking nature of invoked functions. The name should sound like the defined pool, for BlockingPool pool we can use blocking { }, for IO pool we use io { }, and so on...

elizarov commented 6 years ago

Renaming runBlocking is on the table, too.

As for the naming of the context please keep in mind #261. All the contexts will be backed by the same shared pool, so I don't think it is appropriate to use Pool in their names. Moreover, I'm considering to deprecate CommonPool and recommend its replacement with DefaultContext when we are done with this.

Let me also link a related discussion on the naming of dispatchers: #41. I'm not convinced that we should isolate names of the dispatchers into a separate named, but I'm also not firmly convinced that they should be top-level.

fvasco commented 6 years ago

Personally I consider the DefaultContext's name a good name, but a little messy. Defining only DefaultContext induces to define an AlternativeContext: chaos over confusion.

A coroutine context appears more rich than a single coroutine dispatcher, so use Context induces me to consider it as a dispatcher plus something else... What is wrong in the DefaultContext is the dispatcher only, we can name it CpuBoundDispatcher, giving a clear name makes the problem more evident: we cannot use the CpuBoundDispatcher for a non CPU bound task.

Instead, defining

val DefaultContext : CoroutineContext = CpuBoundDispatcher + (optionally something else)

leads me to consider

val BlockingContext = DefaultContext + NonCpuBoundDispatcher

Unfortunately I haven't solved this issue, I am sorry. The above considerations are confusing my mind, but it is probably the reason why this report looks more like a beauty contest than a technical problem.

elizarov commented 6 years ago

Now when we merged experimental coroutines dispatcher we can deliver IO scheduler soon using it, even if DefaultDispatcher would be still pointing to CommonPool for some time.

LouisCAD commented 6 years ago

I'm a bit confused as to what to expect for the future of kotlinx.coroutines default/recommended dispatchers. Will we have to continue running I/O separate from CPU-bound tasks like it's currently done with CommonPool and custom dispatcher (often executor based), or will they be merged to the new experimental coroutines dispatcher so we no longer have to worry about I/O being done in the same dispatcher as the CPU bound code?

elizarov commented 6 years ago

The expected future looks like this. There going to be only a single pool of threads in addition to UI thread.

fvasco commented 6 years ago

Its default parallelism is equal to the number of cores.

Are you considered:

The current plan is to set defaultParallelism to nCPUs + 1 as a compromise value that ensures utilization of the underlying hardware even if one coroutine accidentally blocks and helps us avoid issue #198

and later in https://github.com/Kotlin/kotlinx.coroutines/issues/261 ?

voddan commented 6 years ago

@elizarov What's the state of mind on this issue? Dispatchers.IO is as confusing as ever.

IMHO of all the names suggested above, Dispatchers.Elastic is the most direct and self-describing one. Also, it draws a useful parallel with Amazon Elastic Compute Cloud.