ReactiveX / RxPY

ReactiveX for Python
MIT License
4.72k stars 356 forks source link

How to on_next from background thread in main thread? #667

Open alek5k opened 1 year ago

alek5k commented 1 year ago

Hi, I want to pass data from a background thread into the main thread. I have the following code, but it seems that CurrentThreadScheduler() is not doing what I expect.

I'm using reactivex version 4.2.

from reactivex import operators as ops
from reactivex.scheduler import CurrentThreadScheduler, ThreadPoolScheduler
from reactivex import Subject
import threading
from threading import Thread

class MyThread(Thread):
    def __init__(self, callback):
        self.callback = callback

    def run(self):

my_subject = Subject()

def callback(data):
    ''' This is called in a separate thread '''
    print(f"In callback: {threading.current_thread().name}")

if __name__ == "__main__":

    thread_to_execute_on = CurrentThreadScheduler()
    # thread_to_execute_on = ThreadPoolScheduler(max_workers=1)

    print(f"Before stream: {threading.current_thread().name}")

    background_thread = MyThread(callback=callback)

        lambda data: print(f"In subscription: {threading.current_thread().name}")



The output is:

Before stream: MainThread
In callback: Thread-7
In subscription: Thread-7

I've also tried using ThreadPoolScheduler and the data is correctly passed to the threadpool thread. In that scenario, output is:

Before stream: MainThread
In callback: Thread-7
In subscription: ThreadPoolExecutor-0_0

Is there something I can use to schedule work back on the main thread? for example: ops.observe_on(MainThreadScheduler()). This seems to be quite simple to do in C# and java.

To be clear, the output I am after is:

Before stream: MainThread
In callback: Thread-7
In subscription: MainThread
matiboy commented 1 year ago

Hi @alek5k

Looks like the issue is specific to the Subject here;

Below repro is actually not relevant as all examples that show "MainThread" are actually calling on_next from the main thread; sorry please ignore

thread_to_execute_on = CurrentThreadScheduler()
def print_thread(prefix: str):
    return lambda *x: print(prefix, threading.current_thread().name, x)
of(1, 2, 3).pipe(ops.observe_on(thread_to_execute_on)).subscribe(print_thread("OF")) 
# => OF MainThread (1,) ; OF MainThread (2,) ; OF MainThread (3,)
timer(2).subscribe(on_next=print_thread("TIMER"), scheduler=thread_to_execute_on) 
# => Do not schedule blocking work! <- warning ; TIMER MainThread (0,)
my_subject = Subject()
# => SUBJECT Thread-1 ('hello',)

That does appear to be a bug; will look into it.

matiboy commented 1 year ago

Sorry my comment above was not a valid repro at all, apologies. After looking at it again, when calling on_next from another thread, the thread used by CurrentThreadScheduler() will indeed be that other thread, not the one where CurrentThreadScheduler was created.

This is actually stated in the doc and can be seen in the code:

Each instance manages a number of trampolines (and queues), one for each thread that calls a schedule method

and code where a new trampoline is created depending on the current_thread upon calling schedule, not upon creation.

def get_trampoline(self) -> Trampoline:
        thread = current_thread()
        tramp = self._tramps.get(thread)
        if tramp is None:
            tramp = Trampoline()
            self._tramps[thread] = tramp
        return tramp

Will investigate further how to achieve what you were asking for.

alek5k commented 1 year ago

Thanks @matiboy , actually I forgot that I have raised a similar issue in the past at this issue.

I guess I didn't really get to a solution. The ability to do work on the main thread is pretty simple in C# RX and it would definitely be nice to have in RxPY.

A lot of the responses seem to assume that you have a UI event loop, which is not always the case, for example, imagine a while loop in the main thread which waits on a user input(), like a CLI.