ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

Why does RxPY keep randomly throwing 'NoneType' object has no attribute 'dispose'? #88

Closed mchen402 closed 8 years ago

mchen402 commented 8 years ago

Minimal working example: see SO post

The issue documented in the post above seems to be due to the following in rx.concurrency.currentthreadscheduler

if not self.queue:
    self.queue = Trampoline(self)

This is not thread-safe yet Scheduler.current_thread = current_thread_scheduler = CurrentThreadScheduler() is shared across threads and is used in an asynchronous manner by LINQ operators. So should we use a lock as below?

from rx import Lock

class CurrentThreadScheduler(Scheduler):

    def __init__(self):
        ...
        self.lock = Lock()

    def schedule_relative(self, duetime, action, state=None):
        ...
        with self.lock:
            has_queue = self.queue
            if not has_queue:
                self.queue = Trampoline(self)

        if has_queue:
            self.queue.enqueue(si)
        else:
            self.queue = Trampoline(self)
            try:
                self.queue.enqueue(si)
                self.queue.run()
            finally:
                self.queue.dispose()
                self.queue = None
        ...

On a further note, the self.queue.dispose() seems also to not be very safe. What if one thread has just called self.queue.enqueue(si) when another is about to call self.queue.dispose()? The queued item will end up being dropped silently.

dbrattli commented 8 years ago

The CurrentThreadScheduler is a single threaded scheduler and should not be mixed with multi-threaded code. The problem with the code is .interval() that by default uses TimeoutScheduler, and timeouts spans new threads. Observable.just uses by default CurrentThreadScheduler, but it does not support timeouts, so you cannot use it with interval(). You should probably go for the NewThreadScheduler or the EventLoopScheduler to run on a designated thread. Make sure all operators use the same scheduler to avoid problems. I agree that this is a bit of a mess, and there should be an easier way to set the scheduler to be used by all operators for a given subscription instead of having to give it as an argument to all the time dependent or scheduling operators.

mchen402 commented 8 years ago

Thanks again Dag.

there should be an easier way to set the scheduler to be used by all operators for a given subscription instead of having to give it as an argument to all the time dependent or scheduling operators.

^ I'd love to see that prioritised

dbrattli commented 8 years ago

Yes, I'm thinking about cleaning up the scheduler part, and perhaps only have 2-3 native schedulers for Python. ThreadPool (not impl. yet) w/EventLoop for each thread, and Asyncio. The other ones are just confusing for most people. ThreadPool should probably be default, and it should be easy to switch everything to Asyncio without having to add a parameter to every operator. Either using the config environment, or by adding a scheduler option to subscribe() to set the default scheduler for the subscription.

mchen402 commented 8 years ago

You should probably go for the NewThreadScheduler or the EventLoopScheduler to run on a designated thread.

Based on the above I tried:

from rx import Observable
from rx.internal.basic import noop
from rx.disposables import CompositeDisposable
from rx.concurrency import NewThreadScheduler

scheduler = NewThreadScheduler()
observables = Observable.interval(1).map(lambda x: Observable.just(x, scheduler))
disposable = CompositeDisposable([observables.switch_latest().subscribe(noop) for i in xrange(10)])

But I still get the same errors as before.

AttributeError: 'NoneType' object has no attribute 'dispose'

I tried using EventLoopScheduler also, but still same errors. Am I doing what you meant?

dbrattli commented 8 years ago

Yes, almost. You also need to give the scheduler to interval(1). But that didn't help, so there must be a bug in the code. Looks like I was wrong about CurrentThreadScheduler and that it should not be mixed with multi-threaded code. It's actually not possible to get rid of it since it's used by the subscribe() logic even if you give schedulers as arguments to all operators. I've added a fix that makes sure each thread gets their own queue on the CurrentThreadScheduler. So it's still current, but current for each thread. NewThreadScheduler will run out of threads if you use it, so it's a good case for creating a ThreadPoolScheduler. But the example should now work without specifying schedulers. Checked into develop, so please test.

lock[bot] commented 5 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.