Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.01k stars 1.85k forks source link

Flow.shareIn and stateIn operators #2047

Closed elizarov closed 4 years ago

elizarov commented 4 years ago

This issue supersedes #1261 and is based on the SharedFlow #2034 and StateFlow #1973 framework. See #2034 for most of the conceptual details on shared flows.

Introduction

A MutableSharedFlow provides convenient means to own a shared flow of values that other parts of code can subscribe to. However, it is often convenient to take an existing cold Flow that is defined in some other piece of code and start sharing it by launching a coroutine that collects this upstream flow and emits into the shared flow.

shareIn operator

The shareIn operator is introduced:

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0,
): SharedFlow<T>

It has the following parameters:

Starting sharing

There are the following out-of-the-box implementations of SharingStarted:

The WhileSubscribed strategy is a function with optional parameters:

fun SharingStarted.Companion.WhileSubscribed(
    stopTimeoutMillis: Long = 0, 
    replayExpirationMillis: Long = Long.MAX_VALUE
)

All the above values and WhileSubscribed function are defined in SharingStarted companion object. This way, additional values/functions can be defined as extensions. Variants of WhileSubscribed function that work with different representations of duration (java.time.Duration, kotlin.time.Duration, Long, TimeUnit) will be defined by the library in the appropriate modules.

stateIn operator

As StateFlow is a specialized version of SharedFlow, stateIn operator is a specialized version of shareIn operator. It does not have a replay parameter (it is always equal to 1 for the state flow) and has a required initialValue:

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

When execution happens in suspending context and you want to compute and wait for the initial value of the state to arrive from the upstream flow, there is a suspending variant of stateIn without initial value and with the hard-coded sharingStarted = Eagerly:

suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>

Custom starting strategies

SharingStarted is an interface that supports 3rd-party implementations, allowing any starting strategy to be plugged into the sharing operators:

interface SharingStarted {
    fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}

An implementation of SharingStarted provides a single function command that transforms the subscriptionCount of the shared flow into the flow of commands that control sharing coroutine and are represented with SharingCommand enum:

enum class SharingCommand { START, STOP, STOP_AND_RESET_REPLAY_CACHE }

Error handling

Any error in the upstream flow cancels the sharing coroutine and resets the buffer of the shared flow. The error is delivered to the scope. If this behavior is not desirable, then error-handling operators (such as retry and catch) should be applied to the upstream flow before shareIn operator. If the upstream completes normally, then nothing happens.

Conceptual implementation

The conceptual implementation of shareIn operator is simple:

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val upstream = this
    val shared = MutableSharedFlow<T>(
        replay = replay, 
        extraBufferCapacity = maxOf(replay, DEFAULT_BUFFER_SIZE) - replay, 
    )
    scope.launch { // the single coroutine to rule the sharing
        started.command(shared.subscriptionCount)
            .distinctUntilChanged()
            .collectLatest { // cancels block on new emission
                when (it) {
                    SharingCommand.START -> upstream.collect(shared) // can be cancelled
                    SharingCommand.STOP -> { /* just cancel and do nothing else */ }
                    SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> shared.resetReplayCache()
                }
            }
    }
    return shared
}

Note, that a buffer is padded to some minimal default capacity (64) by default for performance reasons.

Operator fusion

The actual implementation of shareIn operator is more complex. It fuses with the immediately preceding flowOn operators, directly launching the sharing coroutine in the corresponding context (without additional coroutine and channel to change the context).

It also fuses with the immediately preceding Flow.buffer operators. It allows for explicit configuration of the buffer size, creating a shared flow that takes a configured buffer size into account:

Note, that the last fusion of conflate().shareIn(scope, started, r) also flounders with the true spirit of fusion concept. It produces execution results that are different from the case of separate sequential application of confate and shareIn operators without fusion. For example, take an upstream flow that emits consecutive numbers starting from 1 with delay of 100ms between numbers and slow downstream subscriber taking 530ms for each value. A conflate() followed by buffer-1 shareIn without fusion collects values 1, 2, 3, 6, 11, 16, etc (three initial values collected due to double-buffering by conflation and sharing) and would buffer even more initial values if a default buffer size was used. But a fused implementation will have a single small buffer and collects 1, 6, 11, 16, etc just as expected from a separate conflate() without sharing. Fused buffer decreases the overall latency, processing the number 6 asap at 1060ms mark after start, as opposed to non-fused version where the number 6 would be collected at 2120ms mark. We consider this to be a good thing.

Implementation

Implementation is in PR #2069.

zach-klippenstein commented 4 years ago

I love the SharingStarted design, seems really powerful.

SharingStarted.WhileSubscribed - … It resets the cache replay buffer to the initial value when subscribers disappear.

Just to confirm, opting in to preserve the cache between upstream collections (ala RxReplayingShare) is as simple as filtering RESET_BUFFER commands from the Flow produced by WhileSubscribed, right?

elizarov commented 4 years ago

Just to confirm, opting in to preserve the cache between upstream collections (ala RxReplayingShare) is as simple as filtering RESET_BUFFER commands from the Flow produced by WhileSubscribed, right?

@zach-klippenstein Filtering does not work, since WhileSubscribed only sends RESET_BUFFER (which also stops). You better use WhileSubscribed(cacheExpirationMillis = Long.MAX_VALUE). Now, if that becomes a popular choice, then we can also provide a short-cut option for it. The only challenge is to pick a name for it.

pacher commented 4 years ago

Can you please elaborate a little bit more on error handling?

The error is delivered to the scope.

But what happens to collectors (launched in different scopes)? As far as I can understand they will hang there forever. Materialized error will become regular emission and will not be delivered to the scope. Also upstream will not be unsubscribed unless #2042 is implemented and applied to materialized flow.

Same goes for completion. SharedFlow explicitly never completes, but shareIn could be applied to finite cold flow and there is no word in the design about this case. So as far as I can understand all the collectors will hang there forever after upstream completes.

P.S. SharingStarted design is really powerful and flexible, kudos for that!

elizarov commented 4 years ago

Can you please elaborate a little bit more on error handling? But what happens to collectors (launched in different scopes)? As far as I can understand they will hang there forever.

@pacher Yes. The basic rule of shared flow is that subscriber never completes. We'll have it written very near the top in shared flow docs and repeat in stateIn docs, too.

So what does it mean in practice? It depends on the error-handling policy of the app:

elizarov commented 4 years ago

shareIn could be applied to finite cold flow and there is no word in the design about this case

@pacher During design and community review we have not found use-cases for sharing finite-size cold flows to subscribers whose number is not known in advance. We've found that there are some use-cases for replicating a finite-size cold flow into fixed number, known in advance, flows. This case is robust with respect to error propagation since it is syntactically scoped in nature and we plan to address it separately by a dedicated operator for flow replication that will be both completion and error propagating.

pacher commented 4 years ago

@elizarov Thanks for clarifications, very helpful I never did android and "crashing the app" is not an option for me. Maybe I am in the wrong place and not your target audience, which is a pity because I like what you do here.

I do believe that materialization should be baked-in, maybe as another operator which uses shareIn under the hood. I was trying to show that materialization could be trickier than it seems. For example another riddle: how to make materialized shareIn deliver only the error to new collectors if it "failed" and not to replay the whole buffer up until the error. For me it's just a riddle, I don't have a use-case for replay > 1 yet.

It's nice to see that you are still thinking about replicate. For the moment it felt like you are starting to get rid of error propagation in general, which is one the greatest aspects of reactive design in my opinion.

P.S. I remember how proud Andrey looked in his keynote when he talked about kotlin being used in the banks. I can't imagine them being happy with "crashing the app" either ;-)

pacher commented 4 years ago
* critical bug in your code like a `NullPointerException` may accidentally sit forever materialized

Materializing error could be done along the lines of

upstream.
   catch {
      emit( materializedError )
      throw it
   }

This way it would be delivered both downstream and to the scope.

elizarov commented 4 years ago

:mega: There is an important question on the design of sharing operators we need community help with. We need to figure out what should be the default behavior of SharingStarted.WhileSubscribed with respect to cache reset. We feel that the default should be based on the most common usage.

The question only matters for an upstreamFlow.shareIn(scope, replay) where replay >= 1 or for stateIn operator (where replay is always implicitly equal to one). The intended use for started = WhileSubscribed is a case when an upstream flow is expensive to maintain (uses network resources, device connection, or something like that) and you want to maintain a running upstream flow only when there is at least one downstream subscriber present. Now, when all subscribers disappear, we have two variants of behavior to pick as our default:

πŸ‘ Immediately reset the cache to the initial value (if it was specified; clear if it was not) so that the next time subscribers appear they will not get stale value(s) from the previous upstream flow collection but will receive some kind of initial value (or an empty flow) to explicitly tell them to wait while upstream emits anything (establishes network connection and gets data, etc)

πŸš€ Keep the last value(s) emitted by the upstream so that the next time subscribers appear they will immediately get previously emitted last value(s) without having to wait until upstream flow emits anything.

Let's do quick poll: What do you use in your code most often and want to see as a default?

ZakTaccardi commented 4 years ago

I haven't really used Rx in over 2 years since I've been using coroutines, and I can't remember what scenarios I preferred the πŸ‘ scenario vs πŸš€. I would love to hear more use cases about where everyone would prefer one over the other, and vice versa.

πŸ‘ Immediately reset the cache to the initial value (if it was specified; clear if it was not)

Downside: less performant than quickly emitting a cached value

πŸš€ Keep the last value(s) emitted by the upstream

Downside: unexpected weird buggy behavior. For example, with the network connection example, if you had a State.Loaded(..) that was cached, and an .onStart { State.Loading }, you would receive stale Loaded(..) with fully loaded data immediately, followed by an immediate State.Loading. This could result in a fully loaded UI of stale data being shown with a quick swap to a loading screen.

My personal opinion is that the πŸ‘ makes more sense as a default because it is safer, and for places where πŸš€ makes more sense, devs can opt-in to that less safe but potentially more performant behavior

JakeWharton commented 4 years ago

But move .onStart before the cache and you don't have that problem. The whole point of the cache is so you get the most recent value. If you're deliberately subverting the cache, the problem isn't the cache behavior it's your order of operations. The state machine which starts with the loading state is an upstream source of states from the cache.

LouisCAD commented 4 years ago

@elizarov If you use replay with a value of 1 or greater, why would you want to not replay it but resetting to the initial value instead?

To me, for shareIn, the πŸ‘ behavior makes sense with no replay (value of 0), and the πŸš€ behavior makes sense with replay (>= 1).

For stateIn, if there's a way to have it replace the last value with the initial value, I think the behavior should not have any default, to force being explicit about the behavior. That said, for stateIn, in regards to StateFlow API, I'm not sure replacing the last value with the initial value is right, so I'd say it should always give the last value, and other use cases would use shareIn.

ZakTaccardi commented 4 years ago

If you're deliberately subverting the cache, the problem isn't the cache behavior it's your order of operations. The state machine which starts with the loading state is an upstream source of states from the cache.

I'd have to use .shareIn(..) more to understand it, but if you are building some type of state machine where you do some style of scan operation where your Flow<T> needs an initial state, then I'd expect it to be quite useful to have that initial state be the cached value

ZakTaccardi commented 4 years ago

@elizarov for the poll, it might be useful to show an example of the .shareIn(..) parameters of what the non-default behavior would look like to specify whether you want the cache to replay or reset in the two scenarios

RobertKeazor commented 4 years ago

This is a bit of a tricky question, πŸš€ seems to represents the concept that this flow always maintains the most recent value of the state machine . But emitting a state that could be stale sounds pretty dangerous lol. Than this one πŸ‘ seems safer but breaks that concept... I agree @elizarov is always better , as long devs have a way to opt in to πŸš€

matejdro commented 4 years ago

Good solution for this could maybe allow developer to receive close callback and allow him to transform data before going into "long-term cache"?

Going from Zak's example, one could have State.Loaded(data) and State.Loading(partialData). When stream is in use, it would emit State.Loaded events. But after stream closes, it could take the last data and put it into partialData of State.Loading. That way when new subscriber arrives, it knows that last data is actually stale since received state is Loading, not Loaded.

pacher commented 4 years ago

I would vote to remove WhileSubscribed and introduce two distinct strategies instead. As a side benefit, we would have good examples of how to deal with replay buffer for our custom strategies and could use the source code of either as a starting point.

elizarov commented 4 years ago

UPDATE: Based on your feedback and having discussed the issue in the team we've decided to make the following changes to this proposed design:

I'll update this issue shortly. We're also changing for buffer terminology that is related to operator-fusion. The shared flow design will be tweaked a bit, too.

elizarov commented 4 years ago

Draft PR with implementation πŸ‘‰ #2069

gajicm93 commented 4 years ago

I have a question about CoroutineScope passed into shareIn operator. Since SharedFlow can be collected from many different Scopes with variable lifecycle durations, does the Scope passed into shareIn operator must be longer lasting than all the collector scopes?

For example, if I "shareIn" with Fragment lifecycle scope, but then collect with GlobalScope, that collector will be canceled as soon as my Fragment destroys, even though I'm collecting inside GlobalScope?

Thanks.

elizarov commented 4 years ago

I have a question about CoroutineScope passed into shareIn operator. Since SharedFlow can be collected from many different Scopes with variable lifecycle durations, does the Scope passed into shareIn operator must be longer lasting than all the collector scopes?

@gajicm93 It does not have to be. When the sharing scope is cancelled, the sharing corouitne stops. It means it no longer collects from the upstream, but it does not affect downstream subscribers. They can still be active, although they will not receive any further updates. Moreover, shared flow's replay cache is still preserved, so new subscribers can appear and will get a snapshot of a replay cache, too.

gajicm93 commented 4 years ago

although they will not receive any further updates.

Thank you, this is the key takeaway for me. So essentially I will want the shareIn scope to be "wider" than any of the collectors.

1zaman commented 4 years ago

What about providing an operator to collect to an existing MutableSharedFlow with a SharingStarted strategy? This could be defined as a (suspending) terminal operator, and then we don't need to provide a scope as a parameter. Something like this (naming TBD of course):

suspend fun <T> Flow<T>.collectWhen(shared: MutableSharedFlow<T>, started: SharingStarted)

This would be useful for cases where there are multiple upstreams, or the upstream can change, but we still want to have the benefit of using a SharingStarted strategy.

Of course, there can be no buffer fusion here, since buffer configuration is immutable in MutableSharedFlow.

elizarov commented 4 years ago

@1zaman It looks to be too narrow and quite confusing to be provided out-of-the-box and you can always write this kind of collectWhen yourself if needed:

started.commandFlow(shared.subscriberCount).distinctUntilChanged().collectLatest { 
    when (it) {
        SharingCommand.START -> upstream.collect(shared)
        SharingCommand.STOP -> {}
        SharingCommand.STOP_AND_RESET_BUFFER -> shared.resetBuffer()
    }
}
1zaman commented 4 years ago

Agreed that it's probably a narrow use-case, and might not be warranted in the standard library.

Thinking a bit more on the method definition, it might also be better to define it as an extension method on MutableSharedFlow instead, which takes the upstream Flow as a parameter. Also, I realize now that it's probably not useful to make it a suspending function, since the command flow of the starting strategy might never finish; instead it might better to provide the scope and let it launch a coroutine from that scope, like the stateIn() operator does, and return a Job to allow cancelling the collection.

elizarov commented 4 years ago

πŸ“£ UPDATE: Some final tweaks in the design and worked out PR #2069 with full implementation:

elizarov commented 4 years ago

πŸ“£ UPDATE: PR is now done. Additional changes in the design:

Volatile-Memory commented 4 years ago

I can't wait to use this, I hope it comes out soon like how stateflow came out so quick following it's GitHub issue.

Volatile-Memory commented 4 years ago

Is this in the new coroutines 1.3.8 release that is part of the Kotlin 1.4RC?

Volatile-Memory commented 4 years ago

Any updates on when this might come out?

elizarov commented 4 years ago

UPDATE: Last-minute design change. There will be no default value for started parameter. It will have to be explicitly specified.

fluidsonic commented 4 years ago

I still find this confusing to understand. More explanation and examples would be great (in this issue) on:

The network request example was brought up which is exactly what I'm struggling with at the moment, but in a slightly different manner.

Instead of just fetching data from a server once and then push it downstream, I want include refreshes.

Here's a quick example how it could look like in a fully hot scenario. The questions are now, how can I use StateFlow and SharedFlow to

import java.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

data class Data(val num: Int, val expires: Instant)

fun CoroutineScope.fetchData(initial: Data): Flow<Data> =
    flow {
        var data = initial

        log("I: $initial")
        emit(initial)

        while (true) {
            delay(Duration.between(Instant.now(), data.expires).toMillis())
            data = updateData(data).also { log("U: $it") }
            emit(data)
        }
    }
        .broadcastIn(this)
        .asFlow()

fun log(message: String) = println("${Instant.now()} $message")
fun updateData(data: Data) = Data(num = data.num + 1, expires = Instant.now() + Duration.ofSeconds(3))

suspend fun main(): Unit = coroutineScope {
    val dataFlow = fetchData(initial = Data(num = 1, expires = Instant.now() + Duration.ofSeconds(7)))
    log("-- Flow created. No collectors. Starting in 5s…")
    delay(5_000)
    log("-- Adding 2 collectors")
    coroutineScope {
        launch { dataFlow.take(3).onCompletion { log("-- Removed collector 1") }.collect { log("1: $it") } }
        launch { dataFlow.take(5).onCompletion { log("-- Removed collector 2") }.collect { log("2: $it") } }
    }
    delay(5_000)
    log("-- Adding 1 collector")
    dataFlow.take(2).collect { log("3: $it") }
    log("-- Done")

    coroutineContext.cancelChildren()
}
2020-10-11T15:03:52.628690Z -- Flow created. No collectors. Starting in 5s…
2020-10-11T15:03:57.647744Z -- Adding 2 collectors
2020-10-11T15:03:57.675430Z I: Data(num=1, expires=2020-10-11T15:03:59.593134Z)
2020-10-11T15:03:57.681988Z 2: Data(num=1, expires=2020-10-11T15:03:59.593134Z)
2020-10-11T15:03:57.682021Z 1: Data(num=1, expires=2020-10-11T15:03:59.593134Z)
2020-10-11T15:03:59.597450Z U: Data(num=2, expires=2020-10-11T15:04:02.597321Z)
2020-10-11T15:03:59.598067Z 1: Data(num=2, expires=2020-10-11T15:04:02.597321Z)
2020-10-11T15:03:59.598123Z 2: Data(num=2, expires=2020-10-11T15:04:02.597321Z)
2020-10-11T15:04:02.598584Z U: Data(num=3, expires=2020-10-11T15:04:05.598486Z)
2020-10-11T15:04:02.599157Z 2: Data(num=3, expires=2020-10-11T15:04:05.598486Z)
2020-10-11T15:04:02.600014Z 1: Data(num=3, expires=2020-10-11T15:04:05.598486Z)
2020-10-11T15:04:02.610756Z -- Removed collector 1
2020-10-11T15:04:05.602926Z U: Data(num=4, expires=2020-10-11T15:04:08.602840Z)
2020-10-11T15:04:05.603526Z 2: Data(num=4, expires=2020-10-11T15:04:08.602840Z)
2020-10-11T15:04:08.606803Z U: Data(num=5, expires=2020-10-11T15:04:11.606700Z)
2020-10-11T15:04:08.607280Z 2: Data(num=5, expires=2020-10-11T15:04:11.606700Z)
2020-10-11T15:04:08.607662Z -- Removed collector 2
2020-10-11T15:04:11.609861Z U: Data(num=6, expires=2020-10-11T15:04:14.609795Z) // upstream still hot
2020-10-11T15:04:13.610290Z -- Adding 1 collector
2020-10-11T15:04:14.609555Z U: Data(num=7, expires=2020-10-11T15:04:17.609471Z)
2020-10-11T15:04:14.609942Z 3: Data(num=7, expires=2020-10-11T15:04:17.609471Z)
2020-10-11T15:04:17.609524Z U: Data(num=8, expires=2020-10-11T15:04:20.609459Z)
2020-10-11T15:04:17.609707Z 3: Data(num=8, expires=2020-10-11T15:04:20.609459Z)
2020-10-11T15:04:17.609858Z -- Done

Here's an improved example with a rudimentary shareIn that uses BroadcastChannel. I think it does the right thing but I'm not sure.

import java.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*

data class Data(val num: Int, val expires: Instant)

fun fetchData(initial: Data): Flow<Data> {
    var data = initial

    return flow {
        log("I: $data")
        emit(data)

        while (true) {
            delay(Duration.between(Instant.now(), data.expires).toMillis())
            data = updateData(data).also { log("U: $it") }
            emit(data)
        }
    }
}

fun <T> Flow<T>.shareIn(scope: CoroutineScope): Flow<T> {
    val upstream = this
    var broadcastChannel: BroadcastChannel<T>? = null
    val mutex = Mutex()
    var subscriberCount = 0

    return flow {
        val activeChannel = mutex.withLock {
            subscriberCount += 1
            broadcastChannel ?: upstream.broadcastIn(scope).also {
                broadcastChannel = it
                log("-- now hot")
            }
        }

        try {
            activeChannel.consumeEach { emit(it) }
        } finally {
            mutex.withLock {
                subscriberCount -= 1

                if (subscriberCount == 0) {
                    broadcastChannel?.cancel()
                    broadcastChannel = null
                    log("-- now cold")
                }
            }
        }
    }
}

fun log(message: String) = println("${Instant.now()} $message")
fun updateData(data: Data) = Data(num = data.num + 1, expires = Instant.now() + Duration.ofSeconds(3))

suspend fun main(): Unit {
    coroutineScope {
        val dataFlow = fetchData(initial = Data(num = 1, expires = Instant.now() + Duration.ofSeconds(7)))
            .shareIn(this)

        log("-- Flow created. No collectors. Starting in 5s…")
        delay(5_000)
        log("-- Adding 2 collectors")
        coroutineScope {
            launch { dataFlow.take(3).onCompletion { log("-- Removed collector 1") }.collect { log("1: $it") } }
            launch { dataFlow.take(5).onCompletion { log("-- Removed collector 2") }.collect { log("2: $it") } }
        }
        delay(5_000)
        log("-- Adding 1 collector")
        dataFlow.take(2).collect { log("3: $it") }
    }

    log("-- Done")
}
2020-10-11T15:33:28.882805Z -- Flow created. No collectors. Starting in 5s…
2020-10-11T15:33:33.907860Z -- Adding 2 collectors
2020-10-11T15:33:33.949222Z -- now hot
2020-10-11T15:33:33.951699Z I: Data(num=1, expires=2020-10-11T15:33:35.871034Z)
2020-10-11T15:33:33.955681Z 1: Data(num=1, expires=2020-10-11T15:33:35.871034Z)
2020-10-11T15:33:33.955679Z 2: Data(num=1, expires=2020-10-11T15:33:35.871034Z)
2020-10-11T15:33:35.871934Z U: Data(num=2, expires=2020-10-11T15:33:38.871850Z)
2020-10-11T15:33:35.872275Z 1: Data(num=2, expires=2020-10-11T15:33:38.871850Z)
2020-10-11T15:33:35.872317Z 2: Data(num=2, expires=2020-10-11T15:33:38.871850Z)
2020-10-11T15:33:38.873083Z U: Data(num=3, expires=2020-10-11T15:33:41.872996Z)
2020-10-11T15:33:38.873712Z 2: Data(num=3, expires=2020-10-11T15:33:41.872996Z)
2020-10-11T15:33:38.874460Z 1: Data(num=3, expires=2020-10-11T15:33:41.872996Z)
2020-10-11T15:33:38.877840Z -- Removed collector 1
2020-10-11T15:33:41.874234Z U: Data(num=4, expires=2020-10-11T15:33:44.874107Z)
2020-10-11T15:33:41.874722Z 2: Data(num=4, expires=2020-10-11T15:33:44.874107Z)
2020-10-11T15:33:44.878920Z U: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:44.879452Z 2: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:44.881969Z -- now cold
2020-10-11T15:33:44.882125Z -- Removed collector 2
2020-10-11T15:33:49.884995Z -- Adding 1 collector
2020-10-11T15:33:49.885976Z -- now hot
2020-10-11T15:33:49.886278Z I: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:49.886484Z 3: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:49.886677Z U: Data(num=6, expires=2020-10-11T15:33:52.886645Z)
2020-10-11T15:33:49.886817Z 3: Data(num=6, expires=2020-10-11T15:33:52.886645Z)
2020-10-11T15:33:49.887226Z -- now cold
2020-10-11T15:33:49.887507Z -- Done
fluidsonic commented 4 years ago

The shareIn and stateIn documentation in the pull request is very helpful to better understand the concept πŸ‘

It answers most of my questions above.

elizarov commented 4 years ago

@fluidsonic It all boils down on whether you work with a state of something (and only need the latest most recent value of this state) or with a shared stream of events

elizarov commented 4 years ago

UPDATE: Very last-minute design change: The order of parameters in shareIn in schanged to be consistent with stateIn. The would both accept CoroutineScope and SharingStarted as the first two parameters, followed by additional operator-specific parameters: initial value for stateIn and replay size of shareIn. Morever, since "replay" is an optional feature of a shared flow we'll make it an optional parameters so that you don't have to specify a magic value of zero to opt out of the replay when you don't need it.

fluidsonic commented 4 years ago

@fluidsonic It all boils down on whether you work with a state of something (and only need the latest most recent value of this state) or with a shared stream of events

@elizarov ah, I've completely missed that a StateFlow is conflating. It takes a while to commit all these details into memory.