Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.02k stars 1.85k forks source link

Need way to use parallel decomposition of list without saturating dispatcher queue #1022

Open MartinDevi opened 5 years ago

MartinDevi commented 5 years ago

I think the coroutine library is missing a simple solution to use parallel decomposition on the contents of a large list without saturating the dispatcher queue, so that other coroutine work won't be suspended until the entire list has been processed.

I present the issue here and some solutions that I considered. If you know better solutions, please point them out.

It seems to me like a common enough scenario and a serious enough issue to be considered in the standard library.

Issue

I have a large list of items, and I need to perform CPU-intensive work on each of them (in case you're curious, deciphering) to obtain an output. I'd like to allow this work to be done in parallel on each item. My first instinct was to simply use async to start all the processes, then await that they all finish.

suspend fun <T, R> Iterable<T>.mapAsync(
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    map { async { transform(it) } }.awaitAll()
}

This works fine in a very simple scenario. However, I then wanted to display the progress, with the number of items that have been successfully processed. I introduced an actor to receive a signal whenever an item has been processed and count the progress, then forward progress updates to another channel.

suspend fun <T, R> Iterable<T>.mapAsync(
    progressChannel: SendChannel<Int>,
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    val progressActor = actor<Unit> {
        var index = 0
        for (unit in channel) {
            progressChannel.send(index++)
        }
    }
    val deferredList = map {
        async {
            val result = transform(it)
            progressActor.send(Unit)
            result
        }
    }
    val result = deferredList.awaitAll()
    progressActor.close()
    result
}

However, this implementation doesn't work. The progress updates are only received after all the items have been processed. To reproduce, I set up the following unit test.

@Test
fun testMapAsync() = runBlocking<Unit> {
    withContext(newFixedThreadPoolContext(4, "")) {
        val progressActor = actor<Int> {
            for (progress in this) {
                println("Progress $progress")
            }
        }

        (1..100).mapAsync(progressActor) {
            Thread.sleep(200)
            println("Processed item $it")
        }

        progressActor.close()
    }
}

Rather than printing the progress progressively, shortly after each item is processed, this test prints all the statements indicating that items are processed, then prints all the statements that progress updates have been received.

From what I understand, this is due to the fact that the dispatcher is "fair". Since all the worker coroutines are enqueued immediately, they will be executed before any other coroutines, including the one which updates the progress.

Existing Possible Solutions and Drawbacks

Dedicated Dispatcher

This method doesn't require sample code, it simply relies on having a dedicated dispatcher for the worker coroutines. The drawbacks are obvious. It's also possible to invert the solution, by having a dedicated dispatcher for the progress actor, but this doesn't fully solve the issue either.

Fan-Out

It's possible to use the "Fan-out" mechanism described in the coroutine guide, to manually parallelize the processing by pulling from a channel.

const val PARALLELISM = 4

suspend fun <T, R> Iterable<T>.mapAsync(
    progressChannel: SendChannel<Int>,
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    val result = mutableListOf<R>()
    val progressActor = actor<R> {
        var index = 0
        for (item in channel) {
            result += item
            progressChannel.send(index++)
        }
    }
    val source = produce {
        forEach { send(it) }
        close()
    }
    List(PARALLELISM) {
        launch {
            for (item in source) {
                progressActor.send(transform(item))
            }
        }
    }.joinAll()
    progressActor.close()
    result
}

When doing this, the initial order of the elements isn't maintained, and the parallelism has to be explicitly provided rather than relying on the default dispatcher to optimize for the number of cores (which could also be done manually, but seems like repetition).

Producer Channel

Rather than using map on a list to call async, use produce to create a channel. This artificially introduces suspension points so that the worker coroutines aren't immediately added to the executor queue, but instead are added regularly when the queue finishes some work. In other words, since adding workers is itself done in a coroutine, it will only be done at the rate at which the coroutine dispatcher becomes available.

suspend fun <T, R> Iterable<T>.mapAsync(
    progressChannel: SendChannel<Int>,
    transform: suspend (T) -> R
): List<R> = coroutineScope {
    val progressActor = actor<Unit> {
        var index = 0
        for (unit in this) {
            progressChannel.send(index++)
        }
    }
    val deferredChannel = produce<Deferred<R>> {
        forEach {
            send(async {
                val result = transform(it)
                progressActor.send(Unit)
                result
            })
        }
    }
    val result = deferredChannel.toList().awaitAll()
    progressActor.close()
    result
}

This seems like a hack, and I'm unsure of how it behaves when the worker coroutine has multiple suspension points, causing the dispatcher queue to "roll" more frequently.

elizarov commented 5 years ago

The plan is to eventually add a ready-to-use primitive for that purpose and that is tracked under issue #172

uberto commented 5 years ago

I think the problem is that you don't want the progress indicator running in the same thread pool with the decryption workers, otherwise they will get all the resources.

As example if you replace your progressActor with an AtomicInteger

suspend fun <T, R> Iterable.mapAsync( progressChannel: SendChannel, transform: suspend (T) -> R ): List = coroutineScope{

    val counter = AtomicInteger(0)

    val deferredList = map {
        async {
            val result = transform(it)
            progressChannel.send(counter.incrementAndGet())
            result
        }
    }
    val result = deferredList.awaitAll()
    result

}

You then can change the test extracting the progressActor there to the main context and reducing the ThreadPool to 3. This works very well for me.

        val progressActor = actor<Int> {
                for (progress in this) {
                    println("Progress $progress")
                }
            }

        withContext(newFixedThreadPoolContext(3, "transformer")) {

            (1..100).mapAsync(progressActor) {
                Thread.sleep(200)
                println("Processed item $it")
            }

        }
        progressActor.close()

example of output:

Processed item 1 Processed item 2 Progress 1 Progress 2 Processed item 3 Progress 3 Processed item 4 Processed item 5 Progress 4 Progress 5 Processed item 6 Progress 6 Processed item 8 Processed item 7 Progress 7 Progress 8 Processed item 9 Progress 9 Processed item 11 Processed item 10 Progress 10 Progress 11 ...

I hope this could help you.

On Fri, 1 Mar 2019 at 16:01, Martin Devillers notifications@github.com wrote:

I think the coroutine library is missing a simple solution to use parallel decomposition on the contents of a large list without saturating the dispatcher queue, so that other coroutine work won't be suspended until the entire list has been processed.

I present the issue here and some solutions that I considered. If you know better solutions, please point them out.

It seems to me like a common enough scenario and a serious enough issue to be considered in the standard library. Issue

I have a large list of items, and I need to perform CPU-intensive work on each of them (in case you're curious, deciphering) to obtain an output. I'd like to allow this work to be done in parallel on each item. My first instinct was to simply use async to start all the processes, then await that they all finish.

suspend fun <T, R> Iterable.mapAsync( transform: suspend (T) -> R ): List = coroutineScope { map { async { transform(it) } }.awaitAll() }

This works fine in a very simple scenario. However, I then wanted to display the progress, with the number of items that have been successfully processed. I introduced an actor to receive a signal whenever an item has been processed and count the progress, then forward progress updates to another channel.

suspend fun <T, R> Iterable.mapAsync( progressChannel: SendChannel, transform: suspend (T) -> R ): List = coroutineScope { val progressActor = actor { var index = 0 for (unit in channel) { progressChannel.send(index++) } } val deferredList = map { async { val result = transform(it) progressActor.send(Unit) result } } val result = deferredList.awaitAll() progressActor.close() result }

However, this implementation doesn't work. The progress updates are only received after all the items have been processed. To reproduce, I set up the following unit test.

@Test fun testMapAsync() = runBlocking { withContext(newFixedThreadPoolContext(4, "")) { val progressActor = actor { for (progress in this) { println("Progress $progress") } }

    (1..100).mapAsync(progressActor) {
        Thread.sleep(200)
        println("Processed item $it")
    }

    progressActor.close()
}

}

Rather than printing the progress progressively, shortly after each item is processed, this test prints all the statements indicating that items are processed, then prints all the statements that progress updates have been received.

From what I understand, this is due to the fact that the dispatcher is "fair". Since all the worker coroutines are enqueued immediately, they will be executed before any other coroutines, including the one which updates the progress. Existing Possible Solutions and Drawbacks Dedicated Dispatcher

This method doesn't require sample code, it simply relies on having a dedicated dispatcher for the worker coroutines. The drawbacks are obvious. It's also possible to invert the solution, by having a dedicated dispatcher for the progress actor, but this doesn't fully solve the issue either. Fan-Out

It's possible to use the "Fan-out" mechanism described in the coroutine guide, to manually parallelize the processing by pulling from a channel.

const val PARALLELISM = 4

suspend fun <T, R> Iterable.mapAsync( progressChannel: SendChannel, transform: suspend (T) -> R ): List = coroutineScope { val result = mutableListOf() val progressActor = actor { var index = 0 for (item in channel) { result += item progressChannel.send(index++) } } val source = produce { forEach { send(it) } close() } List(PARALLELISM) { launch { for (item in source) { progressActor.send(transform(item)) } } }.joinAll() progressActor.close() result }

When doing this, the initial order of the elements isn't maintained, and the parallelism has to be explicitly provided rather than relying on the default dispatcher to optimize for the number of cores (which could also be done manually, but seems like repetition). Producer Channel

Rather than using map on a list to call async, use produce to create a channel. This artificially introduces suspension points so that the worker coroutines aren't immediately added to the executor queue, but instead are added regularly when the queue finishes some work. In other words, since adding workers is itself done in a coroutine, it will only be done at the rate at which the coroutine dispatcher becomes available.

suspend fun <T, R> Iterable.mapAsync( progressChannel: SendChannel, transform: suspend (T) -> R ): List = coroutineScope { val progressActor = actor { var index = 0 for (unit in this) { progressChannel.send(index++) } } val deferredChannel = produce<Deferred> { forEach { send(async { val result = transform(it) progressActor.send(Unit) result }) } } val result = deferredChannel.toList().awaitAll() progressActor.close() result }

This seems like a hack, and I'm unsure of how it behaves when the worker coroutine has multiple suspension points, causing the dispatcher queue to "roll" more frequently.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlinx.coroutines/issues/1022, or mute the thread https://github.com/notifications/unsubscribe-auth/AAdYDP-RuKCdmDydHCJLN99gRzH7TtPdks5vSU7VgaJpZM4bZUCY .

fvasco commented 5 years ago

In my opinion the issue in not really addressed and new operators, as proposed, do not solve it.

@MartinDevi solutions works well for this particular use case, but launch multiple mapAsync concurrently can raise the same issue, probably similar for later proposals.

The coroutine scheduler is not fair (a multi thread executor cannot be truly fair), in my opinion the point is that the scheduler are not preemptive, so some large batches can occupy it for an undefined time.

Using only coroutines I consider a solution for this issue a dedicated dispatcher task, bounded like the Default dispatcher but using different set of resource. This should fix this and similar issues. Probably this idea can be enhanced, a similar solution is to use for an 8 CPU computer, 7 + 1 threads for Default dispatcher and the same 7 + another 1 threads for the background dispatcher.

Maybe I really missed something, but I am really curios to know what, thank you.

MartinDevi commented 5 years ago

Thanks for all your input.

IMO there's two main points to take from this issue:

  1. The standard library should contain a dedicated function on iterables (or maybe just collections, because some solutions are easier to implement if the size is known in advance) to handle and optimize the map { async { /* ... * } }.awaitAll() pattern, because implementing this pattern correctly is non-trivial.
  2. This standard function should implement a mechanism to "pull" items from the list progressively, rather than immediately queuing coroutines for all of them, so that the dispatcher isn't immediately saturated.

With that regard, while I agree that issue #172 would help to provide a solution, I think we need a dedicated function for this use-case. Ideally, I think it shouldn't require explicitly specifying the concurrency. I think standard solutions which involve adding dedicated dispatchers for each type of task would ultimately lead to more confusion, and like @fvasco mentioned they won't scale well.

fvasco commented 5 years ago

Regarding the suggested point 2, this mechanism should be global, not local.

In your first example, on 4 CPU machine setting PARALLELISM=4 works well to decipher one file and update UI. In the future you can define a method to decipher all files in a folder using mapAsync, later you define another method to decipher all folders in a disk. This last method launches 4*4*4 parallel tasks, so the supposed parallelism is not honored.

uberto commented 5 years ago

Actually I think there is no way to have a nicely refreshing update if it is not running in its own thread. Sharing the thread pool with greedy coroutines would always result is erratic updates. Happy to be proven wrong but using coroutines for cpu heavy tasks suggest me this.

Uberto

On Mon, 4 Mar 2019, 08:07 Francesco Vasco, notifications@github.com wrote:

Regarding the suggested point 2, this mechanism should be global, not local.

In your first example, on 4 CPU machine setting PARALLELISM=4 works well to decipher one file and update UI. In the future you can define a method to decipher all files in a folder using mapAsync, later you define another method to decipher all folders in a disk. This last method launches 444 parallel tasks, so the supposed parallelism is not honored.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlinx.coroutines/issues/1022#issuecomment-469155872, or mute the thread https://github.com/notifications/unsubscribe-auth/AAdYDN-3SeQqFsrXnBsKgi__kVhG8Lcoks5vTNQugaJpZM4bZUCY .