vert-x3 / vertx-rx

Reactive Extensions for Vert.x
Apache License 2.0
145 stars 72 forks source link

300 Make the ReadStreamSubscriber serialized #305

Closed NilsRenaud closed 6 months ago

NilsRenaud commented 6 months ago

Cherry-picked from 4.x branch

Ensure the ReadStream handler is never called concurrently.

Note: The Rx1 implementation is different because such stack are possible:

at io.vertx.rx.java.ReadStreamSubscriber.onNext(ReadStreamSubscriber.java:244) <-- won't emit its element since `emitting == true`
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowPath(OnSubscribeFromIterable.java:117)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
    at rx.Subscriber.request(Subscriber.java:157)
    at io.vertx.rx.java.ReadStreamSubscriber.lambda$checkStatus$3(ReadStreamSubscriber.java:175)
    at io.vertx.rx.java.ReadStreamSubscriber.checkStatus(ReadStreamSubscriber.java:190)
    at io.vertx.rx.java.ReadStreamSubscriber.serializedCheckStatus(ReadStreamSubscriber.java:115) <-- Set `emitting = true`
    at io.vertx.rx.java.ReadStreamSubscriber.handler(ReadStreamSubscriber.java:70)

Motivation:

Explain here the context, and why you're making that change, what is the problem you're trying to solve.

Conformance:

Your commits should be signed and you should have signed the Eclipse Contributor Agreement as explained in https://github.com/eclipse/vert.x/blob/master/CONTRIBUTING.md Please also make sure you adhere to the code style guidelines: https://github.com/vert-x3/wiki/wiki/Vert.x-code-style-guidelines