ReactiveX / RxScala

RxScala – Reactive Extensions for Scala – a library for composing asynchronous and event-based programs using observable sequences
Apache License 2.0
888 stars 121 forks source link

`sliding` doesn't close the window #201

Closed danielepolencic closed 8 years ago

danielepolencic commented 8 years ago

Hello, I'd expect sliding to open a new window when the first observable emits an object. I'd also expect that window to be closed when the closing observable emits an object.

val source = Observable.interval(100.milliseconds).share
val open = Subject[String]()
val close = Subject[String]()
open.subscribe(it => println("open publishes", it))
close.subscribe(it => println("close publishes", it))
source.subscribe(it => println("source", it))

source.subscribe(it => if (it == 2) open.onNext("YO"))
source.subscribe(it => if (it == 6) close.onNext("OY"))

source.sliding(open)((_) => Observable.just(1))
  .flatMap(it => it.toList)
  .subscribe(it => println("window: ", it))

produces the following output:

(source,0)
(source,1)
(source,2)
(open publishes,YO)
(window: ,List())
(source,3)
(source,4)
(source,5)
(source,6)
(close publishes,OY)
(source,7)
(source,8)
(source,9)

Instead I'd expect (window: ,List(3, 4, 5, 6)) to appear just after (close publishes,OY).

The same code works as expected in RxJS: https://jsfiddle.net/c7Lpj3a1/188/

UPDATE: The code does work IF I replace Subject[String] with Subject[Object] for both close and open. Now I'm even more confused. Am I supposed to always use Object for signalling opening and closing events? At the moment the signature for the method is Observable[Any] and not Observable[Object].

danielepolencic commented 8 years ago

It turns out I don't know how hot/cold observables & sliding work. Sorry.