Open gmarchelos opened 5 years ago
Could you please explain a bit your use-case? E.g. what kind of domain-specific problem you would like to solve with such operator
I need to batch live events to reduce synchronization cost and limit refresh rate.
Currently I implemented this feature in my project: events are sent to a channel and batched to 10Hz in an event bus.
I have flow of trades and I need to collect trades within specified timespan (it's high probability for many trades to occur within 1-5 sec) and then do some processing of collected trades. It's costly for me to process trades one by one so I would like to have buffer operator that collects events within specified timespan.
Thanks. This kind of operator would be called chunked
in Kotlin lingo. Thanks for spelling out a use-case with a time-limit chunking. See #1290 for a size-limited use-case.
I'm in need of this too. My use case is as following:
I implemented a barcode scanner which scans the camera input live. That means that I get barcode results every few milliseconds. However the barcode implementation is not perfect and sometimes detects a wrong barcode. For that case I want consume the barcode events chunked in 2 second windows and pick the barcode that was detected most frequently.
@PaulWoitaschek I had a requirement for a similar (though not exactly the same operator) and ended up implementing my own (final implementation at the end), if it helps.
I have a need for a time based chunked too. I have a stream of analytics events originating in an mobile app. I would like to chunk them in time-based windows, so that I can send them to the backend periodically without initiating too many network connections. Say, chunk events for a minute, send them over the network and repeat for the next package.
For this task I care for:
I do not care for:
I had a discussion earlier with @elizarov about size-based chunk operator: https://github.com/Kotlin/kotlinx.coroutines/pull/1558#issuecomment-659485243
So a single operator for time- and size-based chunks is desired. So it is possible to chunk by size, by time and by both.
I would like to propose a base operator with three parameters:
public fun <T> Flow<T>.chunked(chunkDuration: Duration, minSize: Int, maxSize: Int): Flow<List<T>>
chunkDuration: For how long shall the accumulating of single chunk happen. If set to 0, this could mean no-time limit
maxSize: Maximum size of a chunk. If chunk reaches maximum size, it should be emitted before time-limit is reached. In such case, I think it would intuitive, that "chunking timer" is reset when this happens. Would default to NO_MAXIMUM special value.
minSize: For my use case, I would not like to receive empty chunks. I would even like to specify a minimum size of about 10. If chunk does not reach this size in specified time, I am ok with values not being emitted in "current chunk" but added in front of a next chunk. But perhaps in other use case it might be convenient to receive chunks, even being empty, but at regular intervals. So that minSize = 0 could be specified in such case. Min size could be used, when deciding, what to do, when source flow completes - we need to decide, whether we need to emit a "partial", not fully filled chunk as a last emission. In such case, size of a last chunk could be checked, whether it fulfills minSize parameter. Would default to 1
- I think that most common use case is a need to collect multiple values for some batch processing and empty chunks are not desirable.
Important consideration is how exactly time-based chunking should be implemented. Should "chunking" start right at the moment of collection, or only after receiving first value? Should new "chunking window" start right after previous window, or only after receiving first value for a new chunk?
Based on my needs, if I want chunks to not be empty, that means I am ok with emissions not happening in regular intervals. In fact I want chunking to gather as much values as possible in a time frame. So I would propose, that if minSize > 0, then chunking should start only after receiving first value for a new chunk. That way, we won`t be "chunking air". Only minSize = 0 could mean, that we want regular chunk emissions, no matter the size -> chunking should start right at the moment of collection and new chunking window should follow right after previous.
Above may not be very intuitive however. Maybe we should always start time-based chunking right after collection and let one chunking window follow another. Chunks not fulfilling minSize would just be added to the following chunks and so on until the subsequent chunk fulfills minSize param.
As for exact API shape, I think we could have one operator with @Experimental Duration param and another with just chunkDurationMs: Long -> to not tie this operator experimentality to kotlin.time.
I also thought, that for just size-based chunking it would still be convenient to have a separate operator without Duration param. Otherwise, with chunked(chunkDuration: Duration, minSize: Int, maxSize: Int)
it is IMHO too easy to specify duration and maxSize to 0
and end up with runtime validation error. Also from this signature it is not evident, that Duration == 0 is possible, if we want only size-based chunking.
I have also decided to omit trailing lambda { chunk: List<T> -> userFunction(chunk) }
specifying what to do with a chunk after its emission -> to not emit a list if something more specific is needed dwonstream. I think that
flow.chunked(10.seconds).map { chunk -> ... } should be enough.
To sum up:
public fun
public fun
public fun
Please remember to consider natural batching use-case (#902) in the design. How to express "emit chunk always immediately as soon as possible" in the proposed API? Basically to buffer upstream emissions while collector is busy with the previous chunk and emit a chunk as soon as collector is ready to receive.
Good point @pacher
To express this, yet another, use case in single operator something more expressive about is needed I think. A way, to precise what needs to happen for a chunk to be emitted. How about:
chunk(emitWhen: ChunkConstraint)
where
sealed class ChunkConstraint {
class BySize(val size: Int) : ChunkConstraint()
class ByTime(val timeLimit: Duration, val minSize: Int = 1, val maxSize: Int = NO_MAXIMUM) : ChunkConstraint()
class UntilCollectorIsReady(val minSize: Int = 1, val maxSize: Int = NO_MAXIMUM): ChunkConstraint()
}
..expressing three most popular use cases? Do you think that last case CollectorIsReady
needs max size customization option? Perhaps someone would like to have a time constraint there too?
One thing I regret, that casual user will no longer be able to type chunk(10.seconds)
and be done with it, like in RxJava.
But chunk(ByTime(10.seconds))
looks pretty good too.
I was more thinking along the lines of what @elizarov suggested in this comment: duration is set to zero and size limit is set to very big value
I think duration of 0 reflects more the natural batching case and there should be a special value for no time limit similar to suggested NO_MAXIMUM for size.
Then you can do chunk(10.seconds)
.
Max size does not make sense with 0 duration, but min size does and could be used. Upd.: After some thought, natural batching and size limits are kind of in contradiction.
P.S. I, personally, would add an optional parameter like batch=true
to the existing buffer
operator or even introduce new batch
operator (example) because I am worried that fitting single implementation of chunked to be used for natural batching might result in not the most optimal implementation of the latter.
Do you think that last case
CollectorIsReady
needs max size customization option? Perhaps someone would like to have a time constraint there too?
Not really. Time constraints are direct contradiction: which one is it? Should I emit as soon as possible, meaning now, or wait a duration? Restricting min size would be the same as using chunked with min size alone without any time limits. Max size: when max size is reached we can't emit anyway because the collector is busy. Perhaps there is a use case here to suspend upstream on max size. Otherwise I can hardly imaging that somebody would implement a batch processing with the upper bound on the batch size. Maybe, but nothing comes to my mind.
All in all, duration 0 for natural batching is a clear winner for API design in my opinion. ChunkConstraint
is heavy and is not needed yet.
chunk(duration = 0, maxSize=10)
and chunk(duration = 10.seconds, maxSize = NO_MAXIMUM)
both do not care about how fast downstream processing is. If collector cannot keep up, they will suspend producer. As most other operators do (one can change it introducing buffer operator upstream and that is very good separation of concerns IMHO). Then why suddenly chunk(duration = 0, maxSize = NO_MAXIMUM)
should out of the blue do not suspend upstream? Why chunk(duration = 0, maxSize = 100_000_000)
would wait for those 100_000_000 emissions, while chunk(duration = 0, maxSize = NO_MAXIMUM)
should emit as soon as collector is ready (potentially much sooner than reaching 100 mln size)?Trying to fit natural batching into some magic combination of duration and maxSize of a chunk is IMHO not going to be intuitive at all.
That is why I tried to come up with a sealed class, clearly telegraphing possible chunking behaviours. SizeLimit chunking does not need minSize or maxSize, but just size
. While natural batching needs maxSize(I think), but it means a different thing, than in TimeLimitChunking (and should be named differently I guess).
All in all I am seeing, that it is not easy to fit natural batching in here. Maybe instead of trying to do it, we could come up with something more general / abstract, similar to Flow.transform { ... }
chunk(nullableInterval) { ticker, downStream ->
// can construct own buffer / store last value / whatever
// can start/stop timer as needed
// can emit when needed
onNewValue { newValue -> ... }
onTimerTick { ...}
onCollectorReady { ... }
}
kind of like select expression. You could freely omit / start / stop timer and use / ignore onCollectorReady Clause. And most importantly emit whenever and whatever you wish.
Edit: Simplified signature. But boy, that is much more complicated than simple chunk(10.seconds)
. Still time based chunking could be implemented by:
timeChunk(interval) = chunk(interval) { downstream, ticker ->
val acc = mutableListOf()
ticker.start()
onNewValue { newValue -> acc.add(value) }
onTimerTick { downstream.emit(acc.toList()); acc.clear() }
}
@circusmagnus I am waiting for clarification here before discussing it further.
Before we delve into the design we need to need to gather and discuss use-cases.
@circusmagnus has great use-cases which call for maxSize
and duration
chunk constraints. I am not sure I understand the motivation for minSize
, though. The whole minSize
story opens too many questions (does time start counting only when minSize
is filled, etc). Can you live without it?
Well, I think all three use cases:
... can live without minSize. It had two motivations:
transform
) after all.After a few days I think @pacher may be right. Let it be:
chunk(size = NO_CONSTRAINT, duration = 0)
with usage:
chunk()
- means natural batching with default upper limit. Emit package ASAP but wait for at least one element. Hold a default buffer for elements (max size = 64). Suspend upstream, when buffer fills up.
buffer(1024).chunk()
- means natural batching with buffer enlarged to 1024 elements. -
buffer(UNLIMITED).chunk()
- means natural batching with no upper limit
chunk(10)
- means emit in chunks of size 10
chunk(10.minutes)
- emit every 10 minutes. Chunks will have at least one element or will not be emitted(?). Edit: or perhaps just emit 0-sized chunks. Let the user filter them out in separate operator.
chunk(10, 10.minutes)
- emit when size 10 is reached OR 10 minutes have passed
...come to think of it, maybe we could somehow insert AND or OR between size and duration parameter for some more customization:
chunk(size = 10 AND timePassed = 10.minutes)
chunk(size = 10 OR timePassed = 10.minutes)
That would solve all minSize issues without additional parameter.
Discussion seems to have stalled. Perhaps we should take the issue to the coroutines slack channel? Or do something else to attract more public, gather use-cases and discuss design?
my usecase is also batching (not sure if natural or not)
specifically emit if either the buffer reaches size limit or time limit is reached and the buffer contains any elements, although i could also easily filter out empty lists in the flow later
crucial is that after emitting it resets the timer, without that happening it might be that the next run emits too early because the previous timer triggers
also backpressure and such being maintained ofc
Ok, I will have another go at this issue. @elizarov mentioned, that there is no obvious case for minSize and it complicates things. So, let`s assume, that minimum chunk size is always one. Seems to fit all use cases:
No minSize param then.
That leaves us with duration and size params:
fun Flow<T>.chunked(interval: Duration, size: Int): Flow<List<T>>
How to express our use cases with it..
Natural batching:
chunked(interval = 0 // emit as soon as possible, size = someLargishValue // how big of a buffer you are willing to store)
Time based chunking
chunked(interval = x // your main consideration, size = maxAcceptableBuffer)
Size based chunking: That one gets trickier, as we do not need duration at all:
chunked(interval = NO_INTERVAL // technically an Int.maxValue or such, size = desiredBufferSize)
Since our 'size' parameter is either maximumSize for time-based chunking or just desired size - it means we should try to emit immediately, when it is reached. Suspending upstream, if need be.
Interval
param is a little different. We cannot guarantee, that chunk will be emitted exactly after interval has passed. Since interval does not relate to size, we can, I think, safely assume, that it is ok to buffer subsequent elements after interval has passed, even if we cannot emit, due to busy downstream.
In other words reaching size limit - we do suspend upstream until emission happens. Chunk cannot grow bigger, than specified. Reaching time limit - we do not suspend upstream no matter whether we did emit or are still waiting for downstream to get ready. Our time limit may be breached due to busy downstream. We cannot prevent it.
That shapes our design into `chunked(intervalConstraint OR sizeConstraint) consistently across all use cases.
So the proposal boils down to
fun Flow<T>.chunked(interval: Duration, size: Int): Flow<List<T>>
Proposed impl (give or take - no sanity checks, etc):
public fun <T> Flow<T>.chunked(interval: Duration, size: Int): Flow<List<T>> = scopedFlow { downstream ->
val buffer = Channel<T>(size)
val emitSemaphore = Channel<Unit>()
val collectSemaphore = Channel<Unit>()
launch {
collect { value ->
val hasCapacity = buffer.offer(value)
if (!hasCapacity) {
emitSemaphore.send(Unit)
collectSemaphore.receive()
buffer.send(value)
}
}
emitSemaphore.close()
buffer.close()
}
whileSelect {
emitSemaphore.onReceiveOrClosed { valueOrClosed ->
buffer.drain().takeIf { it.isNotEmpty() }?.let { downstream.emit(it) }
val shouldCollectNextChunk = valueOrClosed.isClosed.not()
if (shouldCollectNextChunk) collectSemaphore.send(Unit)
else collectSemaphore.close()
shouldCollectNextChunk
}
onTimeout(interval) {
downstream.emit(buffer.awaitFirstAndDrain())
true
}
}
}
Helper functions:
private suspend fun <T> ReceiveChannel<T>.awaitFirstAndDrain(): List<T> {
val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList()
return drain(mutableListOf(first))
}
private tailrec fun <T> ReceiveChannel<T>.drain(acc: MutableList<T> = mutableListOf()): List<T> {
val item = poll()
return if (item == null) acc
else {
acc.add(item)
drain(acc)
}
}
Plus optimized, non-concurrent, impl for purely size-based chunking:
private fun <T> Flow<T>.chunkedSizeBased(maxSize: Int): Flow<List<T>> = flow {
val buffer = mutableListOf<T>()
collect { value ->
buffer.add(value)
if (buffer.size == maxSize) emit(buffer.drain())
}
if (buffer.isNotEmpty()) emit(buffer)
}
I'm using my own rough implementation for this to batch database updates.
upstreamFlowOfManyFrequentUpdates
.chunked(sizeLimit = 100_000, timeLimit = 2.seconds)
.collect { updates ->
// write updates to DB
}
I like the signature
chunked(sizeLimit: Int, timeLimit: Duration)
Makes it quite clear, that they are in relation sizeLimit
OR timeLimit
. Either of them could also have a value NO_LIMIT (= Int.MAX_VALUE). timeLimit
could also have a form of NATURAL_BATCHING
(=0).
private fun <T> Flow<T>.chunkedSizeBased(maxSize: Int): Flow<List<T>> = flow {
val buffer = mutableListOf<T>()
collect { value ->
buffer.add(value)
if (buffer.size == maxSize) emit(buffer.drain())
}
if (buffer.isNotEmpty()) emit(buffer)
}
Come to think of it, this is not a good idea. Chunking should always happen in different coroutine, than downstream collecting. I think, having your upstream blocked due to slow consumer (like sending a chunk data over a network) is not desirable even in purely size-based chunking. It is also more consistent this way.
backpressure is a desirable property in some systems, at least for our usecase
I agree. We will have backpressure working all right in both cases. Main difference is that backpressure in concurrent solution will kick in a bit later.
For one, I would not like to have my analytics events "production" suspended for a time of making a network request with previous chunk. I want my analytics events "production" suspended only, if chunk hits size limit.
For two, I think time- and size-based chunking should behave consistently / in same way, when it comes to backpressure due to size-limit constraint
I just want to note that there's an impl here that solves at least 50% of the use case: https://github.com/Kotlin/kotlinx.coroutines/issues/2193#issuecomment-705655058
You can use built-in functions to achieve the time-based chunk
private class TimedChunkFlow<T>(sourceFlow: Flow<T>, periodMs: Long) {
private val chunkLock = ReentrantLock()
private var chunk = mutableListOf<T>()
val resultFlow = flow {
sourceFlow.collect {
// the chunk is reused before it's collected by "sample()"
val localChunk = chunkLock.withLock {
chunk.add(it)
chunk
}
emit(localChunk)
}
}.sample(periodMs).onEach {
chunkLock.withLock {
chunk = mutableListOf()
}
}
}
fun <T> Flow<T>.timedChunk(periodMs: Long): Flow<List<T>> = TimedChunkFlow(this, periodMs).resultFlow
Hi I did something different than transforming flow into a chunked flow, but I think it could be helpful to this issue.
My implementation transforms ReceiveChannel
to chunked flow that emits chunks limited by size or availability in time period https://github.com/ezeyniyev/kotlin-chunked-flow.git
Extension method fun <T> ReceiveChannel<T>.asChunkedFlow(chunkSize: Int, timeout: Long) : Flow<List<T>>
has
chunkSize
max items in chunk to emittimeout
max milliseconds to wait to fill chunk up to chunkSize
items and emit that (non empty chunks)for example
channel.asChunkedFlow(4, 130).collect { items ->
println("2: processing $items")
delay(100)
}
makes a receiver flow from that channel
that emits
You can connect many workers that will consume chunks from that channel. for example
launch {
channel.asChunkedFlow(4, 130).collect { items ->
println("1: processing $items")
delay(100)
}
}
launch {
channel.asChunkedFlow(4, 130).collect { items ->
println("2: processing $items")
delay(100)
}
}
launch {
channel.asChunkedFlow(4, 130).collect { items ->
println("3: processing $items")
delay(100)
}
}
Of course multiple producers can send to that channel
Based on discussion here and some of my own exepriences, I have updated PR https://github.com/Kotlin/kotlinx.coroutines/pull/2378
Current APi proposal and usage:
myFlow.chunked(ChunkingMethod.ByTime(interval = TimeUnit.MINUTES.toMillis(15), maxSize = 1024)
myFlow.chunked(ChunkingMethod.BySize(size = 5))
myFlow.chunked(ChunkingMethod.Natural())
myFlow.chunked(ChunkingMethod.ByTimeOrSize(interval =TimeUnit.MINUTES.toMillis(15), maxSize = 20))
/**
* Groups emissions from this Flow into lists, according to the chosen ChunkingMethod. Time based implementations
* collect upstream and emit to downstream in separate coroutines - concurrently, like Flow.buffer() operator.
* Exact timing of emissions is not guaranteed, as it depends on collector coroutine availability.
*
* Size based chunking happens in a single coroutine and is purely sequential.
*
* Emissions always preserve order.
*
* It is possible to pass custom implementation of ChunkingMethod to chunked() operator.
*
* @param method Defines constrains on chunk size and time of its emission.
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.chunked(method: ChunkingMethod): Flow<List<T>> = with(method) { chunk() }
@ExperimentalCoroutinesApi
public interface ChunkingMethod {
public fun <T> Flow<T>.chunk(): Flow<List<T>>
public companion object {
/**
* Collects upstream and emits to downstream in separate coroutines - as soon as possible. If consumer keeps
* up with the producer, it emits lists with single element.
*
* In case of slow consumer, it groups emissions into bigger lists. When consumer "speeds up", chunks
* will get smaller.
*
* @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes"
* a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory.
*/
@Suppress("FunctionName")
public fun Natural(maxSize: Int = Int.MAX_VALUE): ChunkingMethod = NaturalChunking(maxSize)
/**
* Collects upstream into a buffer and emits its content as a list at every interval. When upstream completes
* (or is empty), it will try to emit immediately what is left of a chunk, omitting the interval.
*
* @param intervalMs Interval between emissions in milliseconds. Every emission happens only after
* interval passes, unless upstream Flow completes sooner.
*
* @param maxSize Maximum size of a single chunk. If reached, producer gets suspended until consumer "consumes"
* a chunk. If maxSize is not specified, then chunk may grow indefinitely until jvm runs out of memory.
*/
@Suppress("FunctionName")
public fun ByTime(intervalMs: Long, maxSize: Int = Int.MAX_VALUE): ChunkingMethod =
TimeBased(intervalMs, maxSize)
/**
* Collects upstream into a buffer and emits its content as a list at every interval or when its buffer reaches
* maximum size. When upstream completes (or is empty), it will try to emit immediately what is left of
* a chunk, omitting the interval and maxSize constraints.
*
* @param intervalMs Interval between emissions in milliseconds. Every emission happens only after
* interval passes, unless upstream Flow completes sooner or maximum size of a chunk is reached.
*
* @param maxSize Maximum size of a single chunk. If reached, it will try to emit a chunk, ignoring the
* interval constraint. If so happens, time-to-next-chunk gets reset to the interval value.
*/
@Suppress("FunctionName")
public fun ByTimeOrSize(intervalMs: Long, maxSize: Int): ChunkingMethod = TimeOrSizeBased(intervalMs, maxSize)
/**
* Collects upstream into a buffer and emits its content as a list, when specified size is reached.
* This implementation is purely sequential. If concurrent upstream collection and downstream emissions are
* desired, one can use a buffer() operator after chunking
*
* @param size Exact size of emitted chunks. Only the last emission may be smaller.
*/
@Suppress("FunctionName")
public fun BySize(size: Int): ChunkingMethod = SizeBased(size)
}
}
Implementation details can be seen fully in the PR: #2378 Basically size-based flow is purely sequential, simple impl, based on ArrayList as an accumulator.
Time based methods use a Channel as an accumulator and some form of signaller (a Job or a Channel in case of TimeOrSize method), to eventually indicate, that it should be emptied before interval passes. Emptying of an accumulator Channel is done via tryReceive in a loop - until accumulator channel gets empty / closed.
Natural Chunking uses just two concurrent coroutines - one sends to the channel, the other uses tryReceive in a loop (suspending while waiting for first element).
@qwwdfsad @elizarov What do you think?
@circusmagnus thanks a lot for this proposal and PR. I just have a couple considerations regarding the public API.
Is it necessary to provide ChunkingMethod
as an interface here? Nothing at all is reused in chunked(ChunkingMethod)
for a different implementation. Using a custom implementation of ChunkingMethod
is completely equivalent to just creating a custom extension Flow<T>.something(): Flow<List<T>>
.
The only use case I can think of for using an actual interface is if we want to write a library that uses batches internally (using chunked(ChunkingMethod)
), and still allow users of the library to provide a custom chunking method. But in this case, the lib could just take a function Flow<T>.() -> Flow<List<T>>
as configuration.
I believe simple chunked()
usages will look quite heavy in usual code with the ChunkingMethod
parameter. I liked the initial design with 2 simple parameters. I understand that we have a problem with ByTime
VS ByTimeOrSize
taking the same parameters but having different semantics for the maxSize
parameter (in ByTime
we only use maxSize
for backpressure, while in ByTimeOrSize
we also use it for emissions).
Maybe a simple solution to 2) would be to additionally define related high-level functions like chunkedBySize()
, chunkedByTime()
, etc. but that may be considered "operator explosion".
An alternative could be to simply define chunked()
as taking 3 params, used to internally select a chunking method:
timeLimit
- emits the whole current buffer as soon as this is reached (timer is reset after each actual emission). If the consumer is not ready, keep filling the buffer.sizeLimit
- maximum size of the chunks and also the size of the internal buffer. When this is reached, what happens depends on onFullBuffer
.onFullBuffer
- if EMIT_OR_SUSPEND
, emits the whole buffer when sizeLimit
is reached or suspends the producer if the consumer is not ready when this happens. if SUSPEND
, reaching sizeLimit
only applies backpressure to the producer but never triggers emissions.That is true, that ChunkingMethod
interface does not provide any useful functionality. The problem it was meant to solve, is to fit all four use cases of chunking into a single operator. And (I hope) help the end user to discover what are the chunking options available. It was inspired by SharingStarted
- an interface describing sharing strategy for SharedFlow.
I like very much your idea of onFullBuffer
param and going back to more lightweight params in general.
I have two dillemas with them:
chunked(interval, maxSize, onFullBuffer)
. It would need separate operator, no way around it IMHO. Otherwise you would have to write chunked(interval = Int.MAX_VALUE, maxSize = 10, onFullBuffer = EMIT). Even more work, than
chunked(BySize(10))`.But I would be equally happy with merging a PR with such an (simpler) API.
You're right. I guess as soon as we switch to simple params, the dilemma will be around default values.
The defaults I was initially thinking about were:
interval
, period
)maxSize
)So the use cases would look this way on the call site:
chunked(sizeLimit = 10)
= pure size-based chunking with backpressurechunked(sizeLimit = 10, timeLimit = Duration.seconds(2))
= size or time, with size-based backpressurechunked(timeLimit = Duration.seconds(2))
= pure time-based chunking (but no backpressure, dangerous)chunked(timeLimit = 0)
= unbounded natural batching (no backpressure)chunked(sizeLimit = 10, timeLimit = 0)
= bounded natural batching (with backpressure)chunked(sizeLimit = 10, timeLimit = Duration.seconds(2), onFullBuffer = SUSPEND)
= pure time-based chunking with backpressure, feels pretty complex to expressI'm not a fan of the unbounded default for time-based chunking though... :/ Other defaults could be considered, but there may never be a sweet spot unfortunately.
All-in-all, I sort of agree that the interface (or potential sealed class) would make it more discoverable (I would never know about natural batching without this discussion by just seeing this API).
So maybe your current PR + some convenient overloads of chunked()
would be the best compromise.
I'd like to hear the Kotlin team's opinion about this.
As I said, I would go with two/three operators, when without Chunking method
.
Flow<T>.chunked(size: Int): Flow<List<T>>
- simple size-based
Flow<T>.chunked(interval: Long/Duration, maxChunkSize: Int = Int.MAX_VALUE, onFullBuffer: OnFullBuffer = EMIT)
- for all the other use cases
That way chunked(size = 10)
and chunked(interval = 10.seconds)
should cover elegantly the basic needs with more optional customization for time-based version. chunked(interval = 0.seconds)
is a bit cryptic but good enough too.
I am sticking to original param names, as timeLimit sounds unobvious for me. "Chunked by timeLimits?. TimeLimit = 0? (for Natural Batching)" But maybe that`s my poor english knowledge. :)
Half a year later... What about such a signature?
fun <T> Flow<T>.chunk(
maxSize: Int,
naturalBatching: Boolean = false,
delayUntilNext: (suspend (previousChunk: List<T>?) -> Unit)? = null
): Flow<List<T>>
example usage:
flow.chunk(10)
- simple size chunking
flow.chunk(512) { delay(5.minutes) }
- accumulate values for 5 minutes with max size of 512
flow.chunk(512, naturalBuffering = true)
- natural buffering variant with max size
flow.chunk(512) { previousChunk -> if(previousChunk.size == 512) Unit else delay(5.seconds) }
- speed up if buffer is getting full before chunk emission
flow.chunk(512) { semaphoreChannel.receive() }
- emit chunk after signal from some external source
Pros:
Cons:
flow.chunk(maxSize = 512, interval = 5.minutes)
.Still not so simple, as flow.chunk(maxSize = 512, interval = 5.minutes)
I don't think it's a problem, it probably worth to add an overload that under the hood uses proposed function, there are already such simplified APIs on kotlinx.coroutines which use more generic versions of API to provide more case-specific version of a function
Adding a variant on the "size" use case - where the downstream consumer is an API call it may have constraints on the set of data, such as:
Both involve maintaining and checking state to determine if chunk should be emitted.
It would be helpful if chunked
could use a "batch full" predicate, of which "size" is one possible value (and likely the most frequently used one); others are "weight" and "size or weight".
From existing Java implementations of this (closed source), "weight" can default to Long.MAX_VALUE and the weighing function can default to return 0. This assumes an implementation that tracks both, which may not be the case for a Flow-based variant.
I'm using this implementation which uses standard functions. I'm curious to know if there's anything to improve.
It is a bit wasteful by using launch for every added value but the implementation is clean IMO since there's no polling.
fun <T> Flow<T>.chunked(maxSize: Int, interval: Duration) = channelFlow {
val buffer = mutableListOf<T>()
var flushJob: Job? = null
collect { value ->
flushJob?.cancelAndJoin()
buffer.add(value)
if (buffer.size >= maxSize) {
send(buffer.toList())
buffer.clear()
} else {
flushJob = launch {
delay(interval)
if (buffer.isNotEmpty()) {
send(buffer.toList())
buffer.clear()
}
}
}
}
flushJob?.cancelAndJoin()
if (buffer.isNotEmpty()) {
send(buffer.toList())
buffer.clear()
}
}
EDIT: updated from circusmagnus comments.
flushJob?.cancel()
with flushJob?.cancelAndJoin()
it should be solved.Otherwise it indeed looks clean and simple. :)
We also need this in IJ
fun <T> Flow<T>.bufferList(size:Int):Flow<List<T>> {
return flow {
val buffer = mutableListOf<T>()
this@bufferList.collect {
buffer.add(it)
if (buffer.size == size) {
this.emit(buffer)
buffer.clear()
}
}
}
}
Can I imp for this code?
fun <T> Flow<T>.bufferList(size:Int):Flow<List<T>> { return flow { val buffer = mutableListOf<T>() this@bufferList.collect { buffer.add(it) if (buffer.size == size) { this.emit(buffer) buffer.clear() } } } }
Can I imp for this code?
I guess that wont work, because it wont emit if the flow terminates but the buffer is not exactly full
It would be helpful if chunked could use a "batch full" predicate, of which "size" is one possible value (and likely the most frequently used one); others are "weight" and "size or weight".
I just want to echo this. I'm currently using a chunked
implementation to post logs to a service. I would like to meet these two requirements:
However, timeliness is not important in my use case. If emissions are buffered, or suspend, to satisfy the above requirements, that is fine.
Would love this type of operator in stdlib. My use case is chunking bluetooth advertisements on Android so that we can determine when a device hasn't been seen before, and when a device has been turned off or is out of range.
Copied from #4069
I have several cases where I need to group flow events by time and then combine them into something. The simplest example is averaging. If I have the real-time data, I frequently want to take all points in a given time window and return only averaged value for this time frame (or default value if no events happened in this time window). I have several such cases in VisionForge (visualization library) and Controls-kt (device data acquisition.
I would call it sample, but it is already taken, so it could be something like chunkedByTime to be aligned with stdlib method for collections. So, it could look like this: fun Flow
To make the API more universal one could provide external signal trigger. So, the collection is triggered by obtaining signal from a flow or channel.
Another API consideration is usage of API on numbers. For my purposes List is OK, it could be grouped later, which gives flexibility. But when we use List of numbers, it impacts performance. So, one could provide an inline method that transforms array of numbers internally in the flow evaluation loop. Like fun Flow
sample and debounce has similar functionality, but they return only one value, not all of them. I think the implementation could share some code.
Hey guys from 2024! Anyone has a production working snippet for this? I'm not smart enough to easily identify the corner cases when implementing this. So far I've seen:
Option A)
@ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
fun <T> Flow<T>.bufferTimeout(size: Int, duration: Duration): Flow<T> {
require(size > 0) { "Window size should be greater than 0" }
require(duration.toMillis() > 0) { "Duration should be greater than 0" }
return flow {
coroutineScope {
val events = ArrayList<T>(size)
val tickerChannel = ticker(duration.toMillis())
try {
var hasTimedOut = false
val upstreamValues = produce { collect { send(it) } }
while (isActive) {
val tickerJob = launch {
tickerChannel.receive()
hasTimedOut = true
}
withTimeoutOrNull(10) { upstreamValues.receive() }
?.let { events.add(it) }
if (events.size == size || (hasTimedOut && events.isNotEmpty())) {
emit(events.toList())
events.clear()
tickerJob.cancel()
hasTimedOut = false
}
}
} finally {
tickerChannel.cancel()
}
}
}
}
Option B
@OptIn(FlowPreview::class)
private class TimedChunkFlow<T>(sourceFlow: Flow<T>, periodMs: Long) {
private val chunkLock = ReentrantLock()
private var chunk = mutableListOf<T>()
val resultFlow = flow {
sourceFlow.collect {
// the chunk is reused before it's collected by "sample()"
val localChunk = chunkLock.withLock {
chunk.add(it)
chunk
}
emit(localChunk)
}
}.sample(periodMs).onEach {
chunkLock.withLock {
chunk = mutableListOf()
}
}
}
Option C
fun <T> Flow<T>.chunked(maxSize: Int, interval: Duration) = channelFlow {
val buffer = mutableListOf<T>()
var flushJob: Job? = null
collect { value ->
flushJob?.cancelAndJoin()
buffer.add(value)
if (buffer.size >= maxSize) {
send(buffer.toList())
buffer.clear()
} else {
flushJob = launch {
delay(interval)
if (buffer.isNotEmpty()) {
send(buffer.toList())
buffer.clear()
}
}
}
}
flushJob?.cancelAndJoin()
if (buffer.isNotEmpty()) {
send(buffer.toList())
buffer.clear()
}
}
I also checked the proposal https://github.com/Kotlin/kotlinx.coroutines/pull/2378/files But since it used internal functions, I'm unable to use
Currently Flow supports only buffer operator with capacity. It would be useful to buffer elements within specified time range.
flow.buffer(Duration.ofSeconds(5)).collect {...}