ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

Disposing subscription does not cancel Futures #141

Closed rgbkrk closed 7 years ago

rgbkrk commented 7 years ago
import rx
import concurrent.futures
import time

pp = concurrent.futures.ProcessPoolExecutor(3)

def slow_add(a,b):
    time.sleep(3)
    print(a, "+", b, "=", a + b)

sub = rx.Observable.merge(
  pp.submit(slow_add, 1, 2),
  pp.submit(slow_add, 3, 4)
).subscribe(print)

time.sleep(0.1)

sub.dispose()

Will end up printing. Same thing if I wrap the futures with a call to rx.Observable.from_future.

How would I submit work to a process pool that can be cancelled when a subscription is disposed?

dbrattli commented 7 years ago

Disposing is usually best (worst) effort, but in this particular case I was surprised that they were not cancelled, and if so it would be a bug. But digging into this I can see that RxPY is trying to cancel the futures:

➜  RxPY-reactivex git:(develop) ✗ python3 cancel.py
merge_all().subscribe()
from_future().dispose()
cancel() <Future at 0x1096735c0 state=running>
Call is currently being executed and cannot be cancelled.
from_future().dispose()
cancel() <Future at 0x109673b38 state=running>
Call is currently being executed and cannot be cancelled.
1 + 2 = 3
3 + 4 = 7

From concurrent.Futures, cancel(). Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

I can see that RxPY is getting a False back from cancel(), so at least it's trying to cancel. You might want to search the net for discussions about concurrent.Futures and cancelling and see if you can find out why the processes are not killed.

lock[bot] commented 5 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.