syrusakbary / promise

Ultra-performant Promise implementation in Python
MIT License
362 stars 76 forks source link

Update drain_queues to check if queue has already been drained #49

Open johannesfj opened 6 years ago

johannesfj commented 6 years ago

I'm seeing some exception log messages when using the GeventScheduler.

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/gevent/greenlet.py", line 536, in run
    result = self._run(*self.args, **self.kwargs)
  File "/usr/lib/python3/dist-packages/promise/async_.py", line 111, in drain_queues
    assert self.is_tick_used
AssertionError
Fri Dec 15 23:06:07 2017 <Greenlet at 0x7f81be5f9f20: <bound method Async.drain_queues of <promise.async_.Async object at 0x7f81c164f1d0>>> failed with AssertionError

This issue can be reproduced with this code:

from promise import Promise
from promise.schedulers.gevent import GeventScheduler
from promise import set_default_scheduler
import gevent

set_default_scheduler(GeventScheduler())
gevent.spawn(lambda: Promise.resolve(None).then().get()).join()

The problem occurs when then is called on a promise followed by a get.

then queues a job on the gevent event loop to settle the target promise using:

async_instance.invoke(
    partial(target._settle_promise, promise,
            handler, value, traceback),
)

When this happens, target._settle_promise is put on the processing queue and async_.py::Async::queue_tick is called. That in turn puts async_.py::Async::drain_queues on the gevent event loop.

Nothing has yielded to the gevent event loop yet, so the main greenlet is still running. When get is called on the promise async_.py::Async::wait is called, that drains the queues using drain_queue_until_resolved. The promise is now settled since partial(target._settle_promise, promise, handler, value, traceback) was on the async queue. This also flips the flag indicating that the tick has been used and the queues are empty.

When get returns the settled value of the promise chain the greenlet will eventually yield to the gevent event loop and async_.py::Async::drain_queues will run. An assertion error will be thrown since the queue has already been processed.

I've fixed this by removing the assert check and just checking if the tick has already been processed or not in async_.py::Async::drain_queues.

An alternative to this fix is to remove the trampoline logic or disable it when used with gevent, although I think this would happen with most other async libs as well. Removing the trampoline logic would result in work being put on the event loop directly instead of executing in a single job when the queue is drained in async_.py.