ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.76k stars 361 forks source link

Sample does not appear to emit `on_completed` #645

Open markovejnovic opened 2 years ago

markovejnovic commented 2 years ago

Describe the bug It appears to me that running operators.sample prevents the on_completed event from getting fired, while on_error works as expected.

To Reproduce Steps to reproduce the behavior:

  1. Create a subject that is fed with a custom object periodically.
  2. Subscribe to the subject with a operators.sample filter.
  3. Call on_completed
  4. Notice it doesn't get emitted to the subscribed observer.

Expected behavior on_completed is correctly called.

Code or Screenshots

subject = rx.Subject()
def feed_subject(subject: rx.Subject):
    for i in range(100):
        subject.on_next(f'Hello World {i}')
        time.sleep(1e-2)
provider_thread = threading.Thread(target=feed_subject, args=(subject, ))
window = ObserverMainWindow(test_view, subject)

provider_thread.start()
provider_thread.join()

subject.on_completed()

ObserverMainWindow is a function that looks something like:

def ObserverMainWindow(child, data: rx.Observable):
    # ...

    def rerender_window(state):
        # ...

    def close():  # Never fired
        print(f'closing', file=open('test.txt', '+a'))
        # ...

    def on_err(error):  # Correctly fired if `on_error` is called.
        print('closing error', file=open('test.txt', '+a'))
        # ...

    # This does not work
    data.pipe(rxops.sample(1 / 60)).subscribe(
        rerender_window, on_err, close)
    # This for example works as expected:
    # data.pipe(rxops.map(lambda x: x)).subscribe(
    #   rerender_window, on_err, close)

Additional context

I apologize in advance if I'm missing something obvious.

Edits

  1. Added an example of what works
  2. Formatting
  3. Typos
dbrattli commented 2 years ago

Please submit a minimal self contained code example. Minimal means that there should not be any other code. E.g what is test_view, child. Self-contained means that one should be able to copy the code and run it without having to guess which imports are being used.

matiboy commented 1 year ago

Looks like you're not giving the sample the time to emit because main thread dies. Below is a simplified version to reproduce. Change the last time.sleep's value to something lower than the sample duration and you will not see "I am complete" but if you put a value like say 0.1 then you see "I am complete".

import reactivex
from reactivex import operators

subject = reactivex.Subject()
subject.pipe(
    operators.sample(2.5*1e-2)
).subscribe(
    on_next=lambda x: print('NEXT', x),
    on_completed=lambda: print('I am complete')
)

# this has no effect on the story
def feed_subject(s: reactivex.Subject):
    for i in range(8):
        s.on_next(f'Hello World {i}')
        time.sleep(1e-2)

feed_subject(subject)
subject.on_completed()
time.sleep(1e-3) # <--- change this to 0.1 and you'll see the on_completed

The reason for this is that sample only emits on_complete on the next sample iteration. You can see this in the below test (which passes); I skipped all the emitting part, just completed the cold observable at 6.5seconds. As you can see the emit only happens at 8=0.5 (subscription starts) + 2.5 + 2.5 + 2.5 <- next sample, not at 0.5+6.5

def test_issue_645_reproduce():
    scheduler = TestScheduler()
    subject = reactivex.Subject()
    scheduler.schedule_relative(6.5, lambda *_: subject.on_completed())

    results = scheduler.start(
        lambda: subject.pipe(
            operators.sample(2.5)
        ), created=0.01, subscribed=0.5, disposed=20
    )
    assert results.messages == [
        on_completed(8)
    ]

Tbh I don't know if that's supposed to be the case though. Or whether on source completion, sample should emit completion immediately.

matiboy commented 1 year ago

Following up on this, looking at RxJS specs it looks like the expected behaviour is to complete if source completes and even drop any value during the last (incomplete) sample.

I've reproduced the test and it indeed fails.

@dbrattli I would think this is a bug, or at least an inconsistency with RxJS (sorry that's my "reference" library, can't really find the tests in other languages)