Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Operator to chunk a single Flow<T> into a Flow<Flow<T>> #2989

Open Flockavelli opened 2 years ago

Flockavelli commented 2 years ago

I essentially want a stream clipping/chunking operator -- the opposite of FlatMap/Merge.

I have a video stream which is a Flow<Bitmap>.

I map the individual Bitmap frames through an object detector algorithm and get a Flow<FramesWithDetections>

FramesWithDetections is a Pair<Bitmap, List<Detection>> where the list can be empty if nothing is detected, or could contain bounding rectangles of objects that are detected in the frames.

What I want is an operator that could take the Flow<FramesWithDetections> and transform it into a Flow<Flow<Bitmap>> where the inner Flow<Bitmap> is a continuous flow of sequential frames (a "clip") where I detected objects for that whole duration.

I don't want just one of these, I want a flow of these flows (clips) every time objects are detected in sequence.

So to reiterate, I have a stream/flow of frames like this: πŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ˜¦πŸ˜¦πŸ˜¦πŸ˜¦πŸ˜¦πŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ˜¦πŸ˜¦πŸ˜¦πŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ–ΌπŸ›‘

where πŸ–Ό are video frames with no objects detected in them the 😎 and 😦 are video frames that do contain some objects that are detected, and 'πŸ›‘ ' is the end of a stream

the operator would take that overall flow of video frames and give me a Flow<Flow<Bitmap>> where the outer flow would emit 3 inner flows like this:

πŸ˜¦πŸ˜¦πŸ˜¦πŸ˜¦πŸ˜¦πŸ›‘ πŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ˜ŽπŸ›‘ πŸ˜¦πŸ˜¦πŸ˜¦πŸ›‘

then the outer flow would also πŸ›‘

the reason for this is I would want to subscribe to the outer flow and feed the frames into a video encoder: If I flat map the entire stream and feed that into the encoder, I would get one clip with all the frames that have objects detected in them, but I could also subscribe to the outer flow and send the individual clips (inner flows) to the encoder to make 3 separate clips from the one overall video stream.

Flockavelli commented 2 years ago

I am struggling to figure out a way to do this:


typealias FramesWithDetections = Pair<Bitmap, List<Detection>>

private val processedFrames: Flow<FramesWithDetections> =  processFrames()

val clips: Flow<Flow<FramesWithDetections>> = callbackFlow {
    val frameQueue = Channel<FramesWithDetections>()
    val frameQueueFlow = frameQueue.receiveAsFlow()
    launch { processedFrames.collect { frameQueue.send(it) } }

    fun getNextClip() = frameQueueFlow
        .dropWhile { (frame, detections) -> detections.isEmpty() }
        .takeWhile { (frame, detections) -> detections.isNotEmpty() }

    while (true) {
        trySend(getNextClip())
    }
}

something like that is what I am thinking of, but I realize the while(true) loop is not going to wait for the first inner flow to be drained before emitting a new frameQueueFlow...

I think I need channels, since those act like non-blocking queues and I think I can configure them to have subscribers come and go without dropping frames like a ShareFlow<T> would do; There is the risk that if frames get dropped between re-subscriptions, that the clipping done by .dropWhile{...}.takeWhile{...} would drop entire clips.

lowasser commented 2 years ago

Flow<Flow<T>> doesn't make sense, especially because Flows are generally reusable -- what happens when you rerun the internal flows?

The API you will probably need to end up with is a Flow<List<T>>.

Volatile-Memory commented 2 years ago

With a Flow<List<T>> there would be latency to deal with. If a dog walks into the frame for a minute, it would be at least a minute before you could respond to it.

Rerunning the flows may not be not a useful use case here so I haven't thought that through; what is the issue there?

Volatile-Memory commented 2 years ago

Also channels are meant to be used for streams where the items are to be consumed once and not drop if there are no subscribers, and they expose a consumeAsFlow{} and a receiveAsFlow{} method so I'm not sure I agree that the flow concept is generally guaranteed to be reusable.

lowasser commented 2 years ago

When you say that you're struggling to find a way to do this, I think that's because you're working against the way the abstraction is designed, whether or not it's technically legal.

Volatile-Memory commented 2 years ago

that may be right, but there are similar windowing operators in Rx that exist and I've seen issues here proposing to bring them into suspending flows. They don't seem that different, and they can be quite useful.

Could you explain what you think is wrong with the nested flows? πŸ€”

psteiger commented 2 years ago

Hi @Flockavelli ,

what about:

suspend fun main() = coroutineScope {
    val flow = "****AAAA***BCDEF*****GGGE***".toList().asFlow()
    val result = flow.recognizedFlows().onEach {
        println(it.toList())
    }.launchIn(this)
}

fun Flow<Char>.recognizedFlows(): Flow<Flow<Char>> = flow {
    var sequential = mutableListOf<Char>()
    collect { 
        if (it != '*') {
            sequential.add(it)
        } else if (sequential.isNotEmpty()) {
            emit(sequential.asFlow())
            sequential = mutableListOf()
        }
    }
}

Pretty rudimentary and probably needs some more work, but seems to work for non-corner-cases at least. Result above is:

[A, A, A, A]
[B, C, D, E, F]
[G, G, G, E]

P.S. nice graphical representation of the problem.

Flockavelli commented 2 years ago

@psteiger that fulfills the API, but the flows are still prone to the latency issue, and they have the problem of a potentially unbounded buffer.

You would only get the first emission from the inner flow when the outerflow has already emitted all of the inner flows constituent emissions -- i.e. , if you have a 10 minute video and 1 minute in you start to see a dog walking around for 4 minutes before exiting the frame, your recognizedFlow operator would only emit anything after 5 minutes into the original flow.

All the while your inner flow would be growing the mutableList/buffer with no guarantee that you are going to drain it anytime soon.

So if you are buffering 4k video frames for as long as there is a detection, you are consuming more and more memory hoping for the dog to leave the frame before you throw OutOfMemoryException.

If the inner Flow<Flow> emitted as soon as any detections happened, you may be able to process the frames fast enough to avoid this, or employ some other backpressure strategy.

Simply buffering to a list and emitting it as a flow is likely a no go, I think this would require channels or some other way to queue up frames as they become available.

psteiger commented 2 years ago

@Flockavelli I see.

1) you want to emit the inner flow as soon as the first element appears, but at this moment you don't know when it will end, so you could immediately emit a hot flow and dynamically update it and terminate it.

2) once outer flow collection starts, you don't want to collect your inner flow and emit to it on the same coroutine, or it would suspend indefinitely (deadlock). You can't emit to the inner flow while the coroutine is suspended waiting for... inner flow emissions. So I don't see a way to run your use case without buffers, and if you have buffers, you have potential to miss updates (unless you use an "unbounded" buffer, then bound would be memory capacity, or the max buffer size of whatever API is used).

So something along the following lines would fix the latency issue, but for the buffer issue, well, looks like you will need a buffer and a back pressure strategy.

suspend fun main() = coroutineScope {
    val flow = "****AAAA***BCDEF*****GGGE***".toList().asFlow()
    val result = flow.recognizedSharedFlows().onEach {
        launch { println(it.toList()) }
    }.launchIn(this)
}

fun createInnerSharedFlow() = MutableSharedFlow<Char>(64, 0, BufferOverflow.SUSPEND)

fun Flow<Char>.recognizedSharedFlows(): Flow<Flow<Char>> {
    val terminateToken = '$'
    var sharedFlow: MutableSharedFlow<Char>? = null
    var sharedFlowEmitted = false
    return flow { 
        collect { char ->
            if (char != '*') {
                if (!sharedFlowEmitted) {
                    sharedFlow = createInnerSharedFlow().also {
                        emit(it.takeWhile { it != terminateToken }) // emit inner (hot) flow
                        sharedFlowEmitted = true
                    }
                }
                sharedFlow!!.emit(char) // emit to the inner flow
            } else {
                if (sharedFlowEmitted) {
                    sharedFlow!!.emit(terminateToken) // terminate the inner flow
                    sharedFlow = null
                    sharedFlowEmitted = false
                }
            }
        }
    }.buffer()
}

Output is the same, but now inner flows are emitted upon appearance of the first element.

[A, A, A, A]
[B, C, D, E, F]
[G, G, G, E]

Remove .buffer() at the end of recognizedSharedFlows and launch { } inside onEach on main() and it deadlocks.

Volatile-Memory commented 2 years ago

that's a neat solution

psteiger commented 2 years ago

Hummm, thinking deeper about the buffer problem, I think you can get away with using no buffer after all, as long as you just run the outer flow collection and inner flow collection on different coroutines, and use a back pressure strategy of SUSPEND for the inner hot flows.

Then, one coroutine will be suspending during inner flow collection while the coroutine that collects the outer flow and emits to inner flows would not be affected (actually, it would need to suspend once waiting for first subscriber on inner flow to trigger the back pressure strategy, see below).

Seems it would be hard to make sure the inner flows collectors coroutines will collect them serially without a channel, though.

suspend fun main() {
    "****AAAA***BCDEF*****GGGE***"
        .toList()
        .asFlow()
        .doOnEachRecognized(
            onEach = { println(it) }, 
            onEnd = { println("end") }
        )
}

suspend fun Flow<Char>.doOnEachRecognized(
    onEach: suspend (Char) -> Unit,
    onEnd: suspend FlowCollector<Char>.(Throwable?) -> Unit
) = coroutineScope {
    val channel = Channel<Flow<Char>>(Channel.UNLIMITED)
    launch {
        for (innerFlow in channel) {
            innerFlow
                .onEach(onEach)
                .onCompletion(onEnd)
                .collect()
        }
    }
    recognizedSharedFlows()
        .onEach { channel.send(it) }
        .onCompletion { channel.close() }
        .launchIn(this)
}

fun createInnerSharedFlow() = MutableSharedFlow<Char>(0, 0, BufferOverflow.SUSPEND)

fun Flow<Char>.recognizedSharedFlows(): Flow<Flow<Char>> {
    val terminateToken = '$'
    var sharedFlow: MutableSharedFlow<Char>? = null
    var sharedFlowEmitted = false
    return flow { 
        collect { char ->
            if (char != '*') {
                if (!sharedFlowEmitted) {
                    sharedFlow = createInnerSharedFlow().also {
                        emit(it.takeWhile { it != terminateToken })
                        sharedFlowEmitted = true
                    }
                }
                // before emitting the first element, suspend and wait until there's at least 1 subscriber
                // onBufferOverflow will not suspend if there's 0 and values would be lost.
                sharedFlow!!.subscriptionCount.first { it > 0 }
                sharedFlow!!.emit(char)
            } else {
                if (sharedFlowEmitted) {
                    sharedFlow!!.emit(terminateToken)
                    sharedFlow = null
                    sharedFlowEmitted = false
                }
            }
        }
    }
}

Note the absence of buffer() and no replay and no extraBufferCapacity on inner flows.

Output:

A
A
A
A
end
B
C
D
E
F
end
G
G
G
E
end