Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Feature Request: add a hot flow abstraction that can be complete()d or error()ed from the emitter side #4008

Open climategadgets opened 8 months ago

climategadgets commented 8 months ago

Use case

Any hot flow of limited size, or unlimited flow that can complete abnormally. Examples:

The Shape of the API

Current Behavior

Desired Behavior

Prior Art

Project Reactor Sinks.Many tryEmitComplete(), tryEmitError(error)

Note about "non-example" remark from the issue template

This feature is requested not because it is present in Reactor, but because it is a logical extension of flow behavior. This behavior is already implicitly supported by cold flows (complete() happens upon the end of the flow originating set, and error() happens upon throwing an exception from within the flow source), and channels. Having this feature present in the flow will make the behavior uniform across different flow kinds (cold and hot) and make channel adapters behave in a more predictable way.

dkhalanskyjb commented 8 months ago

Could you provide some code examples of how you'd use this feature? It's unclear why the things you list as the use cases are not better expressed via callbackFlow or flow { }.shareIn.

dovchinnikov commented 7 months ago
// available to send from multiple coroutines
val events = Channel<Event>() 

// no broadcast channel anymore, let's use flow
val broadcaster = flow {
  for (e in events) {
    emit(e)
  }
}.shareIn(...)

// a subscription
launch {
  broadcaster.collect { e ->
    handle(e)
  }
}

From shareIn docs:

Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers

which means it's not possible to let subscriber know that there will no more items. events.close does not let collect to return, which does not allow the collector coroutine to complete.

Sure, it's possible to define a sealed event hierarchy like:

// available to send from multiple coroutines
val events = Channel<Event>() 

sealed class EventOrCompletion {
  class ActualEvent(val evt: Event) : EventOrCompletion 
  class FailedOrCancelled(val t: Throwable) : EventOrCompletion
  data object Completed : EventOrCompletion
}

val broadcaster = flow {
  try {
    for (e in events) {
      emit(ActualEvent(e))
    }
    emit(Completed)
  } 
  catch (t: Throwable) {
    emit(FailedOrCancelled(t)
    throws t
  }
}.shareIn(...)

// a subscription
launch {
  broadcaster
    .takeWhile {
      it != Completed
    }
    .map {
      when (it) {
        is ActualEvent -> it.evt
        is Completion -> throw it.t
        else -> error("unreachable")
      }
    }
    .collect { e: Event ->
      handle(e)
    }
}

But it requires duplicate work on each subscriber side, and it still raises a question: why Channel has it embedded, and why MutableSharedFlow is not consistent with Channel?

climategadgets commented 7 months ago

Could you provide some code examples of how you'd use this feature? It's unclear why the things you list as the use cases are not better expressed via callbackFlow or flow { }.shareIn.

Code Examples

One of recent examples I worked with would be here. This project contains lots of other examples, search for .complete() or sink.error() (case insensitive).

callbackFlow

Per documentation, it is a cold flow. The model can probably be coerced to behave something like a completable and errorable hot flow, but it'll take a stretch, and the question stays - why is the stretch necessary if the concept of a reactive stream (which it very much looks like Flow tries to implement) makes no difference between a cold and a hot flow?

.shareIn

Per documentation, .shareIn produces a SharedFlow which, as the original request states, explicitly does not support neither completion nor erroring out.

P.S.: to the point @dovchinnikov makes - he actually presented a solution identical to the one presented by my colleague when I raised this question at work. Yes, it is pretty trivial to implement, but, why do hot and cold flows behave in a different way? Why do flows and channels behave in a different way? Flow to channel and channel to flow transformations present in Kotlin itself are nice, but the concept is broken as long as this inconsistency stays. Think of how many development teams are forced to bridge this gap in subtly different ways (creating subtly different bugs), and how much faster all of us would get to the solution if a uniform solution was available.

P.P.S: A cursory search reveals another almost identical solution: https://stackoverflow.com/questions/75856447/kotlin-sharedflow-and-catching-exceptions "Almost identical" is a bane, I saw cases when people even inside of the same big team use boilerplate code that turns out to be almost the same, but with different quirks - and later, developers run into making errors because they make assumptions based on occurrences they have previously seen but which are no longer true with different occurrences of the same pattern.

climategadgets commented 7 months ago

One more thing - here, cold streams (.fromIterable()) are routinely used in place of hot streams for testing. The need to have .complete() and .error() becomes more obvious if one tries to do the same with cold and hot flows.

dkhalanskyjb commented 7 months ago

Also: https://github.com/Kotlin/kotlinx.coroutines/issues/4019 Rephrasing that issue, there's no way to make flattenMerge work correctly for Flow<StateFlow<_>>: after concurrency StateFlow instances have finished their work, nothing else will be emitted.

yoxjames commented 6 months ago

The replicate enhancement I logged somewhat touches on this topic. Replicated flows would be cold but able to be shared with multiple consumers. Ending them is possible via throwing or the replicated flow simply terminating normally. This accomplishes something like sharing HTTP or disk IO without creating a never ending SharedFlow.

I am not sure if this design covers all use cases you have in mind (it does not include a emitter side close() proposal but it seemed somewhat related. Also this design was for situations where the total number of collectors was known vs unknown. Not sure if that would perfectly capture the use cases you had in mind.

https://github.com/Kotlin/kotlinx.coroutines/issues/3833

elizarov commented 6 months ago

The original design issue for the shared flows goes to great lengths in explaining the rationale for this specific design and contrasts it with the design in reactive libraries like Rx and Reactor. Please, see here: https://github.com/Kotlin/kotlinx.coroutines/issues/2034

The following operators (not implement yet) are supposed to solve the cases where you need to have a shared flow with support for errors: https://github.com/Kotlin/kotlinx.coroutines/issues/2092