Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.01k stars 1.85k forks source link

FlowAsPublisher with 1 element doesn't complete after 1 is requested. #3608

Closed rawilder closed 1 year ago

rawilder commented 1 year ago

Describe the bug

FlowAsPublisher with exactly 1 element suspends and never completes when 1 item is requested.

Provide a Reproducer

flow { emit("string1") }
    .asPublisher().subscribe(subscriber)

flow { emit("string2") }
    .onCompletion { println("done") }
    .collect {
        println(it)
    }

val subscriber = object : org.reactivestreams.Subscriber<String> {
    var subscription: org.reactivestreams.Subscription? = null
    override fun onSubscribe(s: org.reactivestreams.Subscription?) {
        subscription = s
        subscription?.request(1)
    }

    override fun onNext(t: String?) {
        println(t)
    }

    override fun onError(t: Throwable?) {
        t?.printStackTrace()
    }

    override fun onComplete() {
        println("done")
    }
}

Actual

string1
string2
done

Expected

string1
done
string2
done

Maybe it is an irreconcilable difference between the patterns. flow { emit("string1") } is a publisher of one item that should complete after emitting the one item. The implementation of the FlowSubscription is suspending after collecting 1 item for a request of 1: https://github.com/Kotlin/kotlinx.coroutines/blob/1.6.4/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt#L240.

It feels wrong that to complete a FlowAsPublisher of exactly 1 element you must request(2) or request(1) twice.

rawilder commented 1 year ago

Proposed solution, check for suspension before onNext, that way the collect block will end if the element is the last one. This puts the collect block into a slightly weird state, as it is technically collecting the n+1 element even though it was not requested, but if it hangs forever regardless it may not matter.

    private suspend fun consumeFlow() {
        flow.collect { value ->
             // Suspend if needed before emitting the next value
            if (requested.getAndDecrement() <= 0) {
                suspendCancellableCoroutine<Unit> {
                    producer.value = it
                }
            }
            // Emit the value
            subscriber.onNext(value)
            // check for cancellation if we don't suspend
            coroutineContext.ensureActive()
        }
    }
dkhalanskyjb commented 1 year ago

I don't see any bug here. You signal that you only want one element, so that's what you receive. Further computations of elements you don't need won't happen. I think that, for a lot of use cases, not doing extraneous computations is a big upside of the current behavior.

In your example, collect is semantically equivalent to request(Long.MAX_VALUE), because it processes an unbounded number of elements. If the flow had many elements, all of them would be processed.

rawilder commented 1 year ago

You're right Flow.collect is not quite a fair comparison, collect does request all of the elements. That being said, in the more generic publisher/subscriber contract, I do believe that when the elements requested == the elements emitted, the publisher should call complete on the subscriber, which in the case of FlowAsPublisher happens after the completion of the collect block.

Here are two other reactive libraries, reactor and rxjava. Flux and Flowable implement Publisher same as FlowAsPublisher.

    Flux.from(publish { send("string3") })
        .subscribe(subscriber)

    Flowable.fromPublisher<String>(publish { send("string4") })
        .subscribe(subscriber)

Output:

string3
done
string4
done

Using the same subscriber as defined above, they both complete.

dkhalanskyjb commented 1 year ago

That being said, in the more generic publisher/subscriber contract, I do believe that when the elements requested == the elements emitted, the publisher should call complete on the subscriber

Why? Do you have any use cases where the behavior you propose would serve some actual need better? I can certainly imagine cases when the current behavior is desirable—as I said, this allows not to execute code that creates elements that ultimately won't be needed—but I struggle to see the benefits of doing things the way you propose.

Here are two other reactive libraries, reactor and rxjava. Flux and Flowable implement Publisher same as FlowAsPublisher.

You show conversions from a Publisher to something else, but this issue is about converting something else (in this case, Flow) to a Publisher, so these aren't really comparable. In any case, even if other libraries do this, it is not enough on its own to introduce a change, especially a breaking one. It needs to be driven by actual needs.

rawilder commented 1 year ago

The reason I dove down this rabbit hole in the first place is because another library (Micronaut) was hanging forever because of this behavior specifically when an intermediary step was introduced with a flow. The other reactive libraries as an intermediary do not exhibit this behavior.

The source subscriber was requesting exactly one element from a publisher that it knew should be producing one element. I introduced a filter step that used flows to be compatible with other suspending functions. This filter step stops the subscriber from ever receiving an onComplete signal because of this behavior.

You show conversions from a Publisher to something else, but this issue is about converting something else (in this case, Flow) to a Publisher, so these aren't really comparable.

Flowable and Flux both implement Publisher. I could create them manually with their builders but this was simpler. Publisher has a contract that should be filled, which it seems like FlowAsPublisher is not filling. I created a TCK test for FlowAsPublisher and it is failing a few of the tests (although 2 of them are just the IllegalArgumentException on <= 0 request number).

I've been trying to find documentation or best practices somewhere about when a publisher should complete but even the specification is a bit ambiguous about it for this case. https://github.com/reactive-streams/reactive-streams-jvm#1.5 states a finite stream should signal onComplete upon successful termination but that isn't clear what successful termination is. In the summary section, or an onComplete signal when no more elements are available could imply either before or after another request. I've opened an issue on the github repo to see if I can get any discussion on this topic: https://github.com/reactive-streams/reactive-streams-jvm/issues/543

dkhalanskyjb commented 1 year ago

It's clearly the Micronaut library that's non-compliant: https://github.com/reactive-streams/reactive-streams-jvm#2.6

A Subscriber MUST call Subscription.cancel() if the Subscription is no longer needed.

We are not going to change our behavior for a worse one to accommodate a non-compliant library.

The other reactive libraries as an intermediary do not exhibit this behavior.

I found an example where they do:

    Flowable.fromObservable(
        Observable.create { emitter ->
            emitter.onNext("Hello")
        },
        BackpressureStrategy.DROP
    ).subscribe(subscriber)

For me, this prints just Hello, without done.

Publisher has a contract that should be filled, which it seems like FlowAsPublisher is not filling.

If you actually find any non-compliance, please notify us, we take such issues seriously.

rawilder commented 1 year ago

After a brief discussion with a reactive streams contributor in the above link, and you're very compelling arguments I'll concede this change is definitely incorrect. I'll open up a PR with the Micronaut team on it, and close my PR against this issue.

That being said in the process of discovery on this, I did implement the PublisherVerification test for FlowAsPublisher, much like here https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt and observed a few failing tests.

spec309 is simple enough if somewhat breaking (throwing an exception where one wasn't thrown before).

For spec109, flow { } will not throw an exception if request is never called, as FlowAsPublisher does not collect until request is called. I wasn't sure how to fix this one. See here: https://github.com/reactive-streams/reactive-streams-jvm/blob/master/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java#L527

The failing specs

required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe

required_spec309_requestNegativeNumberMustSignalIllegalArgumentException

required_spec309_requestZeroMustSignalIllegalArgumentException

PR that includes test and spec309 fix https://github.com/Kotlin/kotlinx.coroutines/pull/3616

Sorry for all the discussion but I am learning a lot, so thank you for your time!

dkhalanskyjb commented 1 year ago

These failures are expected. We discussed them and, in the end, decided that no one will realistically rely on this behavior: who would benefit from us requiring that request is called? This would likely only lead to more issues about us being incompatible with some non-compliant subscriptions that do not actually call request (or call it with a negative number, etc). In theory, someone could have code like

    override fun onSubscribe(s: org.reactivestreams.Subscription?) {
        subscription = s
        try {
          subscription?.request(n)
        } catch (e: IllegalArgumentException) {
          // according to the reactive streams spec, n is negative
          subscription?.request(1)
        }
    }

In practice, I think we all agree that such code looks really unnatural. So, we decided to be good citizens in favor of being law-abiding citizens here. If this ever causes someone problems, of course, we'll have no option but to become compliant.

rawilder commented 1 year ago

That makes sense thank you. In that case I've updated my PR to only implement the PublisherVerification with appropriate test skips added: https://github.com/Kotlin/kotlinx.coroutines/pull/3616/files