Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.06k stars 1.85k forks source link

Support natural batching in flow #902

Open mtopolnik opened 5 years ago

mtopolnik commented 5 years ago

Natural aka. smart batching is a technique in stream processing that optimizes throughput without affecting latency. On the example of a concurrent queue, the consumer has the ability to atomically drain all the items observed at some instant and then process them as a batch. Ideally, the queue should be bounded, giving an upper limit to the batch size and providing backpressure to the sender at the same time.

It's called "natural" batching because there's no imposed batch size: when the traffic is low, it will process each item as soon as it arrives. In that case you don't need any throughput optimizations by batching items together. When the traffic gets higher, the consumer will automatically start processing larger batches, amortizing the fixed latency of a single operation like a database INSERT.

I wrote this sample code that achieves the basic goal:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

const val batchLimit = 20

@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
        handleItems: (List<T>) -> Unit
) {
    val buf = mutableListOf<T>()
    while (true) {
        receiveOrNull()?.also { buf.add(it) } ?: break
        for (x in 2..batchLimit) {
            poll()?.also { buf.add(it) } ?: break
        }
        handleItems(buf)
        buf.clear()
    }
}

We can test it with this:

@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
    val chan = generateMockTraffic()
    runBlocking {
        chan.consumeBatched { println("Received items: $it") }
    }
}

@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
    return GlobalScope.produce(capacity = batchLimit) {
        (1..100).forEach {
            send(it)
            if (it % 10 == 0) {
                delay(1)
            }
        }
    }
}

consumeBatched() polls the queue one item at a time and therefore must additionally impose a batch limit. It would be more optimal if written against a concurrent queue like the Agrona project's OneToOneConcurrentArrayQueue, which supports the drain operation.

Could support for natural batching be considered as a feature to add?


Comment by @qwwdfsad, taken from Stack Overflow:

It depends on the desired API surface. drain member is unlikely to be fit for channel semantics: it constraints implementation, it should somehow expose drain limit and it gives channel more "collection-like" API. E.g. how should drain behave with an unlimited channel? Is it possible to implement drain in an efficient manner (with pre-sized buffer, but avoiding OOMs and unlimited collections) once and use it with any channel implementation?

What could be improved is additional hints from the channel such as expected capacity and count of enqueued elements. They can have a relaxed semantics with default implementation and act like hints to drain extension with some reasonable configurable upper bounds.

fvasco commented 5 years ago

Natural aka. smart batching is a technique in stream processing that optimizes throughput without affecting latency.

The above statement is reasonable but have to be proven for at least one channel type (ie for array channel).

In my opinion a receiveMany method have to handle backpressure, ie: receive(10) or receiveTo(buffer).

elizarov commented 5 years ago

@mtopolnik Can you, please, share a use-case you have in mind. I do want to study those, because I'm also reluctant to add this kind of API to channels. These kinds of performance optimization are better kept hidden, as implementation details, of higher-lever APIs.

Let me explicitly mention #254 here. With abstraction for cold stream in place, data flows become declarative -- you declare your data processing pipeline and then launch it, without even having to explicitly use communication primitives like channels, etc. So, if your use-case is to generate a sequence of data items in one thread and consume them in another, then you would be able to use the following hypothetical API do it:

dataSource() // produces data in the context of one thread
    .withContext(other)
    .forEach { doSomethingWith(it) }

Now, behind the scenes, withContext combinator will perform switching between the threads using extremely efficient naturally-batched single-produce single-consumer queue, without you, as its end user, ever having to think about it.

mtopolnik commented 5 years ago

The issue is not the efficiency of inter-thread communication, but what you want to do with the items. Natural batching applies to any situation where you're performing an operation that:

  1. accepts a collection of items
  2. has a fixed cost which dwarfs the per-item cost

This fits many use cases, basically every example where you deal with a remote system. It could be a database INSERT, a REST service POST etc. It even applies to within-process operations such as simple file writing, if the requirement is to use an open-write-close cycle.

If all I have is forEach, I can't observe a batch of items as a unit. I need an API that gives me items batch-by-batch.

I can't build upon forEach to achieve the goal, either, because I have no natural batch demarcation. In the steady state the consumer should be working at full speed, never spending any time waiting for a batch to form. Once it's done with the current batch, it immediately accepts all the items that have accumulated since the last time it asked for more.

elizarov commented 5 years ago

Ok. Let me rephrase this, again, as a feature request for #254. You want to have an API like this:

dataSource()
    .forEachBatch { doSomethingWithBatch(it) }
mtopolnik commented 5 years ago

Yes, that would go a long way. One stumbling point here is the question of the ownership of the batch collection.

On the one hand there's the concern of robustness: surrendering the ownership to the consumer function on each invocation is the safest option.

On the other hand we have GC concerns: we'd like to reuse the same collection for all invocations of the consumer function.

It's hard to get both concerns satisfied in a simple API. Possibly the user could supply his own collection as a first, optional argument to forEachBatch. This would help make the ownership explicit.

elizarov commented 5 years ago

@mtopolnik I would rather avoid the issue of ownership in easy-to-use public API (provide a fresh instance every time) to make it less error-prone, but design some lower-lever APIs for advanced users that would allow them to write their own operator implementations that are optimized to the way they see fit.

fvasco commented 5 years ago

Hi @mtopolnik for your use case you can consider:

suspend fun <E : Any> Channel<E>.consumeEachBlock(maxBlockSize: Int, consumer: (List<E>) -> Unit) {
    consume {
        val buffer = ArrayList<E>(maxBlockSize)
        while (coroutineContext[Job]?.isActive != false) {
            buffer += receiveOrNull() ?: return
            while (buffer.size < maxBlockSize) {
                val element = poll() ?: break
                buffer += element
            }
            consumer(buffer)
            buffer.clear()
        }
    }
}
mtopolnik commented 5 years ago

@fvasco This code looks very similar to the code I wrote in the issue description. I can't make out what it improves.

  1. You have explicit checking of the isActive flag, but isn't it checked on each poll anyway?
  2. You have consume which ensures that the receive channel is cancelled when exiting the function. I suppose this has an effect if we exit the loop prematurely due to the coroutine being cancelled.
elizarov commented 5 years ago

In order to make it efficient one indeed needs a built-in operation on channels that "pools" (drains?) multiple elements at once into a user-provided collection. That could help to reduce the amount of internal synchronization.

mtopolnik commented 5 years ago

I went once again to inspect Agrona's queue, which I consider the canonical implementation of a zero-contention SPSC queue.

All operations are done with the release-acquire consistency level (this avoids the costly volatile writes) and none of the the metadata variables (head, tail, etc.) are updated from both sides, which would force the underlying cacheline to change ownership all the time. There's also padding between head and tail memory locations so they don't share the same cacheline. Only the backing array is being written from both sides, but at distant locations, this also reduces the cacheline churn.

The only saving that I see is that the loop in drain() avoids reloading the head variable in each iteration, but since only the consumer updates it, the penalty is quite small. It does update head in each iteration, this has the benefit of allowing the producer to proceed as soon as possible with adding more items, especially if the consumer gets suspended.

elizarov commented 5 years ago

@mtopolnik Unfortunately, channels are MPMC so adding "drain" to them would offer only so much benefit. For real scalability we'll need #254 that could use SPSP queues behind the scenes for buffers and switching between the threads.

mtopolnik commented 5 years ago

As a cross-check, Agrona's MPMC queue also doesn't improve anything in drain(), which just calls poll() in a loop. So it's there just as convenience.

elizarov commented 5 years ago

@mtopolnik Our channel implementation could offer a bit of benefit above calling poll in the loop, but that benefit would be small compared to moving from MPMC to SPSC.

fvasco commented 5 years ago

@mtopolnik

This code looks very similar to the code I wrote in the issue description.

Sorry, I forget it :-(

You have explicit checking of the isActive flag, but isn't it checked on each poll anyway?

See the note here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html

@elizarov

In order to make it efficient one indeed needs a built-in operation on channels that "pools" (drains?) multiple elements at once into a user-provided collection

I propose to consider also sendAll operator, in my use case I receive data from the network, parse it and then I send all messages to a channel.

Generally these primitives can be reconsidered implementing ByteChannels.

qwwdfsad commented 5 years ago

So basically we are postponing this feature until #254 and then we (probably) will be able to provide a consistent API for both cold and hot streams. We will keep this feature request in mind while designing it, thanks for pointing this out!

It is most probably will be an extension in order to help use-cases similar to the original one, where most of the performance win comes from the cutting down the cost of item pre-processing setup. While we are designing and implementing our core primitives as efficient as possible, it is not our goal to provide public access to primitives that are optimized for particular inter-thread communication patterns, such as specialized channels and/or specialized methods that amortize the cost of volatile memory operations.

I propose to consider also sendAll operator, in my use case I receive data from the network, parse it and then I send all messages to a channel.

What is the value of this operator? What problem does it solve to be introduced into the library?

fvasco commented 5 years ago

What problem does it solve to be introduced into the library?

Reduce synchronization costs (adding a block in an ArrayChannel)

qwwdfsad commented 5 years ago

Do you have a real-world problem where a performance-critical code is dominated by the lock contention in ArrayChannel?

fvasco commented 5 years ago

None for sendMany or receiveMany.

Globegitter commented 5 years ago

This is what I have come up now using flows:

@ExperimentalCoroutinesApi
suspend fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = channelFlow<List<T>> {
    require(maxChunkSize >= 1) {
        "Max chunk size should be greater than 0 but was $maxChunkSize"
    }
    val buffer = ArrayList<T>(maxChunkSize)
    collect {
        buffer += it
        if (buffer.size < maxChunkSize) {
            val offered = offer(buffer.toList())
            if (offered) {
                buffer.clear()
            }
        } else {
            send(buffer.toList())
            buffer.clear()
        }
    }
    if (buffer.size > 0) send(buffer.toList())
}.buffer(1)

So it is meant to be used as flowOf(...).bufferedChunks(10).map { ... }or similar. I thought that keeps it more composable than taking a consumer function in directly. Not sure if it is the most efficient way to implement this but seems to work quite well in my experimentations.

pacher commented 5 years ago

So basically we are postponing this feature until #254 and then we (probably) will be able to provide a consistent API for both cold and hot streams. We will keep this feature request in mind while designing it, thanks for pointing this out!

Can we "unpostpone" this feature since #254 is closed?

Upd.:

  1. Use-case landscape is the same as for Flow.conflate where you need to perform an action only on most recent item, but all the previous stale items should be taken into account as part of some computation and hence can not be just dropped.
  2. I would personally use it all the time and everywhere even if I don't do batch processing. Just for the sake of being able to check this batch size, make sure it is always 1 and report back to me if it is not the case. So that I can see warnings in the logs like "Hey! Your consumer is too slow! You need to do something about it!"

Upd. 2: The only implementation of such batching I know from reactive world is @akarnokd's coalesce

elizarov commented 5 years ago

I think we can manage to implement it as part of #1302. That is, a single chunked operator that supports a duration and size limit parameters should work like a "natural buffering" when duration is set to zero and size limit is set to very big value.

circusmagnus commented 5 years ago

@elizarov Does it mean, that #1558 is being rejected, because we want to have a single chunked operator for both size and duration?

What about windowed variance?

elizarov commented 5 years ago

@circusmagnus We have not yet nailed down design of the API for flow windowed/chunked operations. Just gathering ideas and thoughts in all these issues.

circusmagnus commented 5 years ago

Ok.

pacher commented 3 years ago

I think we can manage to implement it as part of #1302. That is, a single chunked operator that supports a duration and size limit parameters should work like a "natural buffering" when duration is set to zero and size limit is set to very big value.

@elizarov Here you mentioned chunked operator and I quote:

Unlike buffer, this operator will be fully sequential.

Unless I misunderstand something, natural batching can not be sequential and implies concurrency just like buffer. Moreover, your solution from here does create an additional coroutine with produceIn.

For the sake of discussion in #1302 I would like to ask: Do you still plan to make chunked sequential or will it be more like buffer and collect upstream concurrently?

Also, I would suggest you to consider natural batching use case for buffer operator instead of (or in addition to?) chunked. Something like an optional parameter batch: Boolean = false in buffer. Buffer is already collecting upstream asynchronously and keeping the buffer. A simple switch to emit whatever is in the buffer right now as a List instead of one-by-one feels natural and in place from the API point of view.

jGleitz commented 3 years ago

A simple switch to emit whatever is in the buffer right now as a List instead of one-by-one feels natural and in place from the API point of view.

@pacher In principle, I agree. When I was looking for this functionality in the existing APIs, one of my first attempts was to see if I could get buffer to behave the way I wanted it to behave.

However, I don’t think your proposal is doable: It’s not possible for buffer to have a different return type based on whether batch is set to true or false.

From my point of view, a non-sequential chunked would make sense. Nevertheless, we might learn from @pacher’s comment and mine that adding a pointer to buffer’s docs might help users finding their way. Something like:

If you want an operator that emits all currently buffered elements instead of each element one-by-one, use chunked(parallel = true)

pacher commented 3 years ago

Good point about return type @jGleitz, don't know how I missed that. Chunked will have a List in the return type by design.

However the question about chunked being fully sequential and how it can fit with batching still remains.

elizarov commented 3 years ago

@pacher

Unless I misunderstand something, natural batching can not be sequential and implies concurrency just like buffer. Moreover, your solution from here does create an additional coroutine with produceIn.

You are right. I have not really thought it through initially. Natural batching does require concurrency.

Also, I would suggest you to consider natural batching use case for buffer operator instead of (or in addition to?) chunked. Something like an optional parameter batch: Boolean = false in buffer. Buffer is already collecting upstream asynchronously and keeping the buffer. A simple switch to emit whatever is in the buffer right now as a List instead of one-by-one feels natural and in place from the API point of view.

The name of buffer would work, but it's signature (returning Flow<T>) would not work. We'll need a separate operator that returns Flow<List<T>> so it is still closer to the chunked/windowed operators. This trick is to actually design it in a way that avoids operator explosion.

schikin commented 3 years ago

I use this:

fun <T> Flow<T>.chunked(batchSize: Int): Flow<List<T>> = flow {
    val accumulator = ArrayList<T>()
    var counter = 0

    this@chunked.collect {
        accumulator.add(it)

        if (++counter == batchSize) {
            emit(accumulator)

            accumulator.clear()
            counter = 0
        }
    }

    //emit the remainder if there's any
    if (accumulator.size != 0) {
        emit(accumulator)
    }
}

Does the job for me

pacher commented 3 years ago

@schikin As far as I can tell your snippet does simple size-base chunking. The issue is about natural batching which is very different.

schikin commented 3 years ago

@pacher Got it, just I think people Googling about simple batching using Kotlin Flows (like this https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer-int-) end up in this thread

stiost commented 2 years ago

Any progress on this? A simple recieveMany(maxCount) would help my case a lot.

Steell commented 1 year ago

I've used the following for "natural" buffering, works well for my needs:

fun <A> Flow<A>.chunked(): Flow<List<A>> = channelFlow {
    var batch = mutableListOf<A>()
    collectLatest {
        batch.add(it)
        send(batch)
        batch = mutableListOf()
    }
}.buffer(Channel.RENDEZVOUS)
circusmagnus commented 1 year ago

^ Smart one. I do have a more low level solution. Might be a little more efficient, without corotuine cancellation and such:

Helper function to drain a channel of all elements, without suspending:

private tailrec fun <T> ReceiveChannel<T>.drain(acc: MutableList<T> = mutableListOf(), maxElements: Int): List<T> =
    if (acc.size == maxElements) acc
    else {
        val nextValue = tryReceive().getOrElse { error: Throwable? -> error?.let { throw(it) } ?: return acc }
        acc.add(nextValue)
        drain(acc, maxElements)
    }

^ It returns a chunk after all elements (which are currently present) are drained from a channel, or we reach max size of a chunk.

Another helper to wait for the first element in channel and then drain it:

private suspend fun <T> ReceiveChannel<T>.awaitFirstAndDrain(maxElements: Int): List<T> = try {
    val first = receive()
    drain(mutableListOf(first), maxElements)
} catch (e: ClosedReceiveChannelException) {
    emptyList()
}

and finally a flow operator:

fun <T> Flow<T>.naturalBatching(maxSize: Int): Flow<List<T>> = flow {
    coroutineScope {
        val upstream = buffer(maxSize).produceIn(this) <- concurrently write to the chunking channel
        while (!upstream.isClosedForReceive) {
            val chunk = upstream.awaitFirstAndDrain(maxSize) <- concurrently drain the channel into chunks
            if (chunk.isNotEmpty()) emit(chunk)
        }
    }
}
hoc081098 commented 1 year ago

Check my implementation 😁 (#FlowExt)

https://hoc081098.github.io/FlowExt/docs/0.x/-flow-ext/com.hoc081098.flowext/buffer-count.html?query=fun%20%3CT%3E%20Flow%3CT%3E.bufferCount(bufferSize:%20Int):%20Flow%3CList%3CT%3E%3E

Sent from my 2201117TG using FastHub

pacher commented 1 year ago

@hoc081098 Again, as in just a few comments above. Your implementation is simple size-based chunking. The issue is about natural batching which is very different.

See an example snippet from @elizarov for a "reference" 😉 implementation