pombreda / gevent

Automatically exported from code.google.com/p/gevent
0 stars 0 forks source link

async watcher wait() blocks forever if send() occurs first #130

Closed GoogleCodeExporter closed 9 years ago

GoogleCodeExporter commented 9 years ago
What steps will reproduce the problem?

Python 2.7.2+ (default, Oct  4 2011, 20:06:09) 
[GCC 4.6.1] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import gevent
>>> gevent.__version__
'1.0b2'
>>> h = gevent.get_hub()
>>> a = h.loop.async()
>>> a.send()
>>> h.wait(a) # This blocks forever

What is the expected output? What do you see instead?

My expectation is that a call to hub.wait() on an async watcher that has 
already had send() called will return immediately, instead, it never returns.

In a program I'm working on, an on_ready() method will either register a 
callback to be executed at a later point when an object is ready (on another 
thread) or, if the object is already ready, execute the callback immediately. 
Thus, the async watcher may have send() called in the callback before the 
wait() call after return from our on_ready().

A simplified, stripped down version of what we're doing:

class Future(object):

    ...

    def get(self):
        ev = ThreadEvent() # Our wrapper that, with gevent, is implemented with an async watcher
        def get_cb(self):
            try:
                # Extracts the actual value of this Future object
                self._value = ...
            except:
                # Handles errors
            finally:
                ev.set() # Would ideally just be async.send()
        self.on_ready(get_cb) # Either calls get_cb() immediately or registers it to be called
        ev.wait()
        return self._value

If our ThreadEvent is just a simple wrapper around an async watcher, it's 
possible for get_cb() to be executed before return from self.on_ready, which 
means ev.set() (and thus a send() on the async watcher) happens before 
ev.wait() (which is a wait() on the async watcher).

This problem also exists in the example code for using async watchers 
(https://bitbucket.org/denis/gevent/src/b47efed2ec45/examples/multiple_threads.p
y). There's no guarantee that the _run() function passed to start_new_thread() 
(on line 23) will be scheduled after the call to hub.wait() on line 24. If the 
async.send() on line 16 executes first, the program will deadlock.

We have worked around this by immediately waiting on the async watcher after 
creation in a new greenlet, and implementing wait via a join of the greenlet 
rather than a wait on the async watcher -- but this is unnecessarily slow.

class ThreadEvent(object):
    def __init__(self):
        self.async = gevent.get_hub().loop.async()
        self.event = gevent.event.Event()
        self.waiter = gevent.spawn(self._waiter)
        self.event.wait()

    def _waiter(self):
        self.event.set()
        gevent.get_hub().wait(self.async)

    def set(self):
        self.async.send()
        self.async = None

    def wait(self):
        self.waiter.join()

If we could rely on a send() before a wait() on async watcher, a wrapper around 
Event to work reliably in a mixed greenlet/thread environment could instead 
look like:

class ThreadEvent(object):
    def __init__(self):
        self.async = gevent.get_hub().loop.async()

    def set(self):
        self.async.send()

    def wait(self):
        gevent.get_hub().wait(self.async)

What version of the gevent are you using?

1.0b2 (also tested on mercurial tip identifying as 1.0b3)

On what operating system?

Ubuntu 11.10 x64

On what Python?

2.7.2

Please provide any additional information below.

Original issue reported on code.google.com by ipet...@gmail.com on 16 May 2012 at 3:39

GoogleCodeExporter commented 9 years ago
[deleted comment]
GoogleCodeExporter commented 9 years ago
[deleted comment]
GoogleCodeExporter commented 9 years ago
Try this sequence (it worked for me with gevent 1.0b2 on MacOS X):

h = gevent.get_hub()
a = h.loop.async()
a.start(lambda: None)
a.send()
h.wait(a) # Should return immediately

After this, a is no longer active.  Call a.start(...) again to re-activate it 
so that the loop will pay attention to it next time.  a.start(...) also resets 
a's pending state.  You can always test a.pending before calling a.start(...) 
if that matters somehow to your logic.

If you were doing the h.wait in a loop, I think the basic idea would be:

a = h.loop.async()
a.start(lambda: None)
<pass 'a' to some other greenlet or thread>

while True:
    h.wait(a)
    # reactivate the async watcher now to avoid potential race condition
    a.start(lambda: None)

    # Process...
    <do all the required processing in response to 'a' being signaled; e.g., drain the *entire* queue that was filled by thread>

Hope this helps.

Original comment by vitaly.k...@gmail.com on 2 Aug 2012 at 7:37

GoogleCodeExporter commented 9 years ago
[deleted comment]
GoogleCodeExporter commented 9 years ago
[deleted comment]
GoogleCodeExporter commented 9 years ago
Migrated to http://github.com/SiteSupport/gevent/issues/130

Original comment by Denis.Bi...@gmail.com on 14 Sep 2012 at 10:53