ReactiveX / RxScala

RxScala – Reactive Extensions for Scala – a library for composing asynchronous and event-based programs using observable sequences
Apache License 2.0
888 stars 121 forks source link

MissingBackpressureException when using Publishers #200

Closed mcenkar closed 8 years ago

mcenkar commented 8 years ago

Hi,

seemingly similar code using org.reactivestreams.Publisher throws MissingBackpressureException while code using scala Observables doesn't. Example:

v0.26.2

import org.reactivestreams.Publisher
import rx.RxReactiveStreams
import rx.lang.scala.JavaConversions._
import rx.lang.scala.Observable

import scala.collection.JavaConverters._

object Test extends App {

  val xD = Observable.just(WrapperO(Observable.from(1 to 350)))
    .flatMap(_.result)
    .toBlocking
    .toList

  println(xD.size)

  val xDD = Observable.just(WrapperP(RxReactiveStreams.toPublisher(rx.Observable.from((1 to 350).asJava))))
    .flatMap(x => RxReactiveStreams.toObservable(x.result))
    .toBlocking
    .toList

  println(xDD.size)

}

case class WrapperP(result: Publisher[_])

case class WrapperO(result: Observable[_])

Output:

350 // <-- from first one
Exception in thread "main" java.lang.RuntimeException: rx.exceptions.MissingBackpressureException
    at rx.exceptions.Exceptions.propagate(Exceptions.java:52)
    at rx.internal.operators.BlockingOperatorToIterator$SubscriberIterator.hasNext(BlockingOperatorToIterator.java:101)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
    at scala.collection.AbstractTraversable.to(Traversable.scala:104)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
    at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
    at rx.lang.scala.observables.BlockingObservable$.toList$extension(BlockingObservable.scala:232)
    at com.leonteq.website.feed.server.service.timeseries.Test$.delayedEndpoint$com$leonteq$website$feed$server$service$timeseries$Test$1(Test.scala:18)
    at com.leonteq.website.feed.server.service.timeseries.Test$delayedInit$body.apply(Test.scala:9)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.leonteq.website.feed.server.service.timeseries.Test$.main(Test.scala:9)
    at com.leonteq.website.feed.server.service.timeseries.Test.main(Test.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:352)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:346)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:329)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
    at rx.internal.reactivestreams.SubscriberAdapter.onNext(SubscriberAdapter.java:52)
    at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107)
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:139)
    at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:52)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
    at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable.subscribe(Observable.java:8265)
    at rx.Observable.subscribe(Observable.java:8232)
    at rx.internal.reactivestreams.PublisherAdapter.subscribe(PublisherAdapter.java:43)
    at rx.RxReactiveStreams$1.call(RxReactiveStreams.java:58)
    at rx.RxReactiveStreams$1.call(RxReactiveStreams.java:55)
    at rx.Observable.unsafeSubscribe(Observable.java:8172)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastpath(OnSubscribeFromIterable.java:127)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:70)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable.subscribe(Observable.java:8265)
    at rx.Observable.subscribe(Observable.java:8232)
    at rx.internal.operators.BlockingOperatorToIterator.toIterator(BlockingOperatorToIterator.java:53)
    at rx.observables.BlockingObservable.getIterator(BlockingObservable.java:147)
    at rx.observables.BlockingObservable$2.iterator(BlockingObservable.java:416)
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
    ... 26 more

Swapping:

    .toBlocking
    .toList

with

    .toList
    .toBlocking
    .single

seems to solve the problem

akarnokd commented 8 years ago

What versions are you using?

mcenkar commented 8 years ago

v0.26.2, RxJava v1.1.6

akarnokd commented 8 years ago

This is the Java code reproducing the issue:

Observable<Integer> o = Observable.range(1, 350);

Observable<Publisher<Integer>> p = Observable.just(
        RxReactiveStreams.toPublisher(o)).asObservable();

for (int u : p.flatMap(v -> RxReactiveStreams.toObservable(v))
.toBlocking()
.toIterable()) {
    System.out.println(u);
}

I'll investigate further.

akarnokd commented 8 years ago

Thanks for your feedback. This is an issue with RxJavaReactiveStreams and has been fixed in 1.1.1; I just released it and may take some time to show up in maven.

mcenkar commented 8 years ago

Awesome, thanks!