ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

"subscribe_on" and Observable.create "scheduler" parameter inconsistent #541

Open freelancer1845 opened 4 years ago

freelancer1845 commented 4 years ago

I'm a little confused about subscribe_on and the 'scheduler' parameter of Observable.create:

Example

import rx
import rx.scheduler
import rx.operators as op
import time
import threading

scheduler = rx.scheduler.ThreadPoolScheduler(max_workers=5)

def producer(observer, scheduler):
    print("Provided scheduler {} - Current Thread: {}".format(scheduler,
                                                              threading.current_thread().name))
    observer.on_next(scheduler)
    observer.on_completed()

obs: rx.Observable = rx.create(producer)

def handle(s):
    if (s == scheduler):
        print("Worked")
    else:
        print("Failed")

print("First Option")
obs.pipe(
    op.subscribe_on(scheduler=scheduler)
).subscribe(lambda x: handle(x), scheduler=scheduler)
time.sleep(0.25)

print("Second Option")
obs.pipe(
    # op.subscribe_on(scheduler=scheduler)
).subscribe(lambda x: handle(x), scheduler=scheduler)

time.sleep(0.25)
print("Third Option")
obs.pipe(
    op.subscribe_on(scheduler=scheduler)
).subscribe(lambda x: handle(x))

time.sleep(1.0)

Options from example

I would expect...

  1. that the passed in scheduler is the ThreadPoolScheduler (but is None) and the subscribe logic is executed by a ThreadPool-Worker (True)
  2. that the passed in scheduler is the ThreadPoolScheduler (True) and the the subscribe log is executed on any thread (in this case its the MainThread but maybe it would also be more logical to run this on the subscribe call scheduler (ThreadPool)?)
  3. Somehow i would also expect that the passed in scheduler is the ThreadPoolScheduler (which it is not) and I can see why, but then I'm asking myself how the "scheduler" parameter can be used? The subscribe logic runs correctly on a Worker thread

I guess it all resolves around understanding what the scheduler parameter that the subscriber of Observable.create gets really represents.

I hope you can understand my problem

rx==3.1.1 python=3.8.3-64bit Windows

Cheers Jascha

MainRo commented 4 years ago

The role of the scheduler parameter is different depending on where it is provided.

First, the scheduler parameter of subscribe is used to propagate a default scheduler that operators can use to emit events. This value is propagated up to the scheduler parameter of the subscription functions. Before rxpy v3, all operators that required a scheduler had a scheduler parameter. This was not convenient because you had to provide the same scheduler in many locations.

The subscribe_on operator is used to execute the subscription and disposal of an observable from another scheduler than the default one. As a consequence, operators that are before the subscribe_on operator in a pipe have their subscribe function executed from this scheduler. This is what you see in 1 and 3. Otherwise subscription and disposal is done from the context of the call to subscribe (the main thread in your examples). This is what you see in 2.

currently, the subscribe_on scheduler does not forward the default scheduler upstream. I will check but this is probably a bug. This explains why on 1 and 3 producer receives None as a scheduler. It should - TBC - be the scheduler provided in subscribe.