ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

AsyncIOScheduler.schedule is not thread safe! #708

Closed fanck0605 closed 1 year ago

fanck0605 commented 1 year ago

Describe the bug I tried to switch a thread to AsyncIOScheduler by observe_on operator and call asyncio Future. set_result() method, but the result does not meet expectations, set_result will not be executed.

I checked rxpy's source code and found that AsyncIOScheduler called loop.call_soon, which is non-threadsafe, instead of loop.call_soon_threadsafe? This is a concurrent library, but not thread safe? After so long development, multiple thread safety issues can still be discovered during use. Can this library really stable, and be used in production?

To Reproduce

asyncio.Future completed will never be done!

completed = loop.create_future()
(
reactive.create(source)
.pipe(
ops.map(do_something),
ops.observe_on(asyncio_scheduler),
)
.schedule(
on_complete=lambda: completed.set_result(None),
scheduler=another_scheduler
)
)
await completed

Expected behavior asyncio.Future completed can be done on_completed