Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Most of the times sharedFlow doesn't emit the initial value #2338

Open CoreFloDev opened 3 years ago

CoreFloDev commented 3 years ago

Hi,

I have been testing sharedFlow over the weekend and I got a strange behaviour with it. I have got some code like that:

class MainScreen(
    private val incrementCounterUseCase: IncrementCounterUseCase,
    private val loadMovieListUseCase: LoadMovieListUseCase
) : Screen<MainInput, MainOutput>() {

    override fun output(): Flow<MainOutput> = input.receiveAsFlow()
        .let(inputToAction())
        .let { stream ->
            val upstream = stream.shareIn(scope, SharingStarted.Lazily)

            listOf(
                upstream.filterIsInstance<Action.InitialAction>().let(loadMovieListUseCase()),
                upstream.filterIsInstance<Action.IncrementNumber>().let(incrementCounterUseCase())
            )
                .merge()
        }

    companion object {
        fun inputToAction() = FlowTransformer<MainInput, Action> { flow ->
            flow.map { input ->
                when (input) {
                    MainInput.Click -> Action.IncrementNumber
                    MainInput.RetryClicked -> Action.InitialAction
                }
            }.onStart {
                emit(Action.InitialAction)
            }
        }
    }
}

If I debug using doOnEach, I was able to see that the Action.InitialAction is emitted every time. However after the shareIn, the stream is not always setup and the function loadMovieListUseCase doesn't receive that InitialAction. And the strange part, it is that the next event on the stream, always respond properly. I have tried both SharingStarted.Lazily and Eagerly, it seems that eagerly make it worth.

Maybe I am missing something, note that I have also tried with coroutine 1.4.0 and it didn't fixed the issue.

Thank you for your time!

CoreFloDev commented 3 years ago

I have noticed that if I add a delay(100) on the onStart it works fine everytime

diousk commented 3 years ago

Have you tried onSubscription mentioned in https://github.com/Kotlin/kotlinx.coroutines/issues/2034 ?

It is similar to onStart with one big difference. onStart calls the action before a subscription to the shared flow is established. It means that if onStart action initiates an operation that emits the values into the shared flow, there is no guarantee that those emissions will be received by this downstream collector. However, onSubscription calls the action after a subscription to the shared flow is established, guaranteeing the reception of all the emitted values after this moment (assuming they are not dropped on buffer overflow).

Is that the case?

CoreFloDev commented 3 years ago

Hi @diousk

Thanks for your reply. I had a look at it when you mentioned onSubscription. I think that operator would work but it seems that it isn't available upstream on the flow class.

elizarov commented 3 years ago

The onSubscription operator is supported only on a SharedFlow by design, since only shared flows have a concept of "subscription".

Does it help?

CoreFloDev commented 3 years ago

Hi @elizarov

yes I got that thanks. But that's still an issue for me as that onStart/OnSubscribe action if made on the upstream. I think it would help if the first value received by a sharedFlow was waiting for it initialisation before consuming it. Would that be possible to do?

Thanks!

elizarov commented 3 years ago

@CoreFloDev Can you, please, explain in more detail what are you trying to achieve. Please, give some more background. What kind of events you are sending? How many consumers of those events your application architecture has? Maybe you are looking for a Channel to transfer your data, not a shared flow?

CoreFloDev commented 3 years ago

Sure, I am creating a classic map reduce application also called redux pattern.

the overall architecture looks like this: Screenshot from 2020-11-24 23-00-19

I am also using the action mapper as a kicker to start the system. To do so, it send an initialAction event that is consumed by all the useCases that needs to execute at the application startup (usually a network request to fetch some data).

Also by digging more into it and with the help of a friend, we figured out that the system was working if we used the sharedflow like this:

val upstream = stream.shareIn(scope, SharingStarted.Eagerly, 1)

It seems that in that configuration, the way it is used make that it never do the replay, I am not really sure why.

Does that help? I can also share the entire project but that could be a bit too much of code.

Thanks!

twyatt commented 3 years ago

@CoreFloDev are you possibly running into a similar behavior I was initially confused by as well? https://github.com/Kotlin/kotlinx.coroutines/pull/2069#issuecomment-702523785

elizarov commented 3 years ago

@CoreFloDev Can you, please, share some more snippets of code -- a full example of the code for one use-case would be great.

CoreFloDev commented 3 years ago

Sure the code lives there: https://github.com/CoreFloDev/flowarch/blob/main/app/src/main/java/com/example/flow_arch/main/arch/MainScreen.kt, I have make the repository public as it is only a prototype for a migration from rxjava to flow. The overall structure takes the shape of an existing rxjava implementation.

The usecases implementation are available here: https://github.com/CoreFloDev/flowarch/tree/main/app/src/main/java/com/example/flow_arch/main/usecases

Let me know if that helps, thanks!

elizarov commented 3 years ago

In your particular architecture the easiest solution I see if to inject InitialAction downstream from shareIn operator, not upstream of it. You can also change replay to zero:

stream.shareIn(scope, SharingStarted.Eagerly).onStart { emit(Action.InitialAction) }

This way, every subscriber is guaranteed to get InitialAction first, followed by further (shared) actions.

CoreFloDev commented 3 years ago

That's similar to the onSubscribe solution, I guess that's all the possible solution at the moment. Maybe I was thinking to build a new type of SharingStarted that would support onStart from upstream, do you think that's achievable? I am not very familiar with coroutine internals.

elizarov commented 3 years ago

In general -- no. The design of the flow is very simple and it does not support the concept of "subscription" in general case. For an arbitrary flow you cannot tell when the "subscription" had happened, so that's why onSubscribe operator is only supported by special, shared flows.

CoreFloDev commented 2 years ago

Hi,

My friend Pawel also got that issue, he found a way around my using a merge like this:

        fun inputToAction() = FlowTransformer<MainInput, Action> { flow ->
            merge(flowOf(Action.InitialAction),
                flow.map { input ->
                    when (input) {
                        MainInput.Click -> Action.IncrementNumber
                        MainInput.RetryClicked -> Action.InitialAction
                    }
                })
        }

Just posting the solution for future reference in case someone get the same issue Thanks!

Erratum: Nan doesn't work still