Closed 0zeroth closed 5 years ago
As a comment to my question: I'm reading with interest the tickets like https://github.com/python-trio/trio/issues/467. At least for me, wrapping existing api's is an extremely common task. A collection of simple tutorials along the lines of 'given an old-school API like this, here is what we consider best-practice to wrap it, with reasons why' would be an invaluable tool for at least two reasons:
TrioApi
like a task or maybe a context to ensure stop
is always called)trio
api should look like. Even if we're lucky enough to only need deal with trio primatives, we could still cobble together an api that behaved pretty badly!I was hoping to turn my experiments with this API into some sort of tutorial - but since I fell at the first hurdle it seems I've got a bit of a way to go!!
The stop function is badly behaved - it consistently causes the
_callback_threadsafe
to be called before it returns, and will not return until_callback_threadsafe
returns. So it decides to act synchronously despite being documented otherwise. As far as I can tell, this causes a deadlock because trio is waiting forApi.stop()
to finish, which is waiting forBadApi.stop()
, which is waiting for the_callback
to finish, which is waiting forBlockingTrioPortal.run_sync
to finish. Butrun_sync
cannot finish, because trio never gets the chance to schedule_callback
.
Let's see if I understand right: your theory is that
TrioApi.stop
, you synchronously call self._api.stop()
self._api.stop
synchronously calls self._callback_threadsafe(...)
self._callback_threadsafe
synchronously calls self._portal.run_sync(self._callback, ...)
Did I follow right?
That would be a problem, yeah... and doing self._portal.run_sync(nursery.start_soon, ...)
wouldn't help, because the deadlock happens as soon as you call self._portal.run_sync
, so it wouldn't matter which callback you pass. Any call to portal.run
or portal.run_sync
from the main Trio thread would cause an instant deadlock, guaranteed.
The weird thing though, is that we thought of this, so run
and run_sync
actually check for this case, and raise an error if you try to call them from the main Trio thread:
In [1]: import trio
In [2]: async def deadlocker():
...: portal = trio.BlockingTrioPortal()
...: portal.run_sync(lambda: None)
In [3]: trio.run(deadlocker)
Traceback (most recent call last):
[...]
File "<stdin>", line 3, in deadlocker
File "/home/njs/.user-python3.6/lib/python3.6/site-packages/trio/_threads.py", line 121, in run_sync
return self._do_it(self._run_sync_cb, fn, *args)
File "/home/njs/.user-python3.6/lib/python3.6/site-packages/trio/_threads.py", line 82, in _do_it
"this is a blocking function; call it from a thread"
RuntimeError: this is a blocking function; call it from a thread
So I guess that's the first thing to figure out... why aren't you seeing this error? If self._api.stop()
is really calling self._callback_threadsafe
directly in the main Trio thread, then self._callback_threadsafe
should be immediately raising RuntimeError
. So... is your badly behaved API so badly behaved that when a callback raises an exception, it swallows the exception and deadlocks? Or is there something else going on, that causes the error to not be generated in the first place...?
I'd start by adding some more debugging to _callback_threadsafe
. Maybe something like:
def _callback_threadsafe(self, , *args):
print('_callback_threadsafe', *args)
print('in thread:', threading.current_thread())
try:
self._portal.run_sync(self._callback, *args)
except:
print("run_sync raised:", sys.exc_info())
raise
else:
print("run_sync returned successfully")
...and then see what that says.
Not quite: self._callback_threadsafe(...)
is called from another thread (call it the API thread) (I have all the logging you suggest in my production code, but stripped it to make the example readable). But it just so happens self._callback_threadsafe(...)
is called consistently before self._api.stop()
returns. So, as far as I can tell, the trio
thread is stuck waiting for self._api.stop()
to return, but the API thread is stuck waiting for portal.run_sync
to finish (and it won't, because trio never gets the chance to schedule the function).
In addition, I tried putting the following call chain (the logger is configured to log the thread):
def _callback_threadsafe(self, *args):
self.log.debug('_callback_threadsafe')
self._portal.run(self._callback_nursery, *args)
self.log.debug('_callback_threadsafe finished')
async def _callback_nursery(self, *args):
await trio.sleep(0.)
self.log.debug('_callback_nursery')
self._nursery.start_soon(self._callback, *args)
async def _callback(self, event: blpapi.Event, session: blpapi.Session):
await trio.sleep(0.)
self.log.debug('_callback')
In this case _callback_nursery
never gets called - so it appears to be the same problem - the blocking portal seems to require trio has the opportunity to schedule the task on before it can complete.
I figured out a reproducible example:
import time
import trio
import threading
import logging
log = logging.getLogger('demo')
class BadApi:
"""
__init__ with a callback that will be called at arbitrary times from arbitrary threads
start() must be called first, then AFTER callback('started') has been called,
further calls can be made
request(id, payload) will cause callback('response', id, response_payload) to be called
stop() must be called, and then AFTER callback('stoppped') has been called we know all threads
have been killed. (failing to call stop causes app to hang because threads are not dead)
"""
def __init__(self, callback):
self._callback = callback
def _do_callback(self, delay, args):
time.sleep(delay)
self._callback(*args)
def start(self):
# delay so we are sure this function finished first
thread = threading.Thread(target=self._do_callback, args=(1., ('started',)))
thread.start()
def stop(self):
thread = threading.Thread(target=self._do_callback, args=(0., ('stopped',)))
thread.start()
thread.join() # require callback finishes first
class TrioApi:
def __init__(self):
self._api = BadApi(self._callback_threadsafe)
self._started = trio.Event()
self._stopped = trio.Event()
self._portal = None
async def start(self):
self._portal = trio.BlockingTrioPortal()
log.debug(f'_api.start()')
self._api.start() # This returns BEFORE _callback_threadsafe executes on another thread
log.debug(f'_api.start() finished')
await self._started.wait()
async def stop(self):
log.debug(f'_api.stop()')
self._api.stop() # This does not return - we end up stuck in _callback_threadsafe
log.debug(f'_api.stop() finished')
await self._stopped.wait()
def _callback_threadsafe(self, *args):
log.debug('_callback_threadsafe')
self._portal.run_sync(self._callback, *args)
def _callback(self, msg_type, id=None, payload=None):
log.debug(f'_callback {msg_type}')
if msg_type == 'started':
self._started.set()
elif msg_type == 'stopped':
self._stopped.set()
else:
pass # Handle the response
async def main():
api = TrioApi()
await api.start()
await trio.sleep(1.)
await api.stop()
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(threadName)s %(name)s: %(message)s")
trio.run(main)
Output:
2019-04-05 14:57:06,649 [DEBUG] MainThread demo: _api.start()
2019-04-05 14:57:06,650 [DEBUG] MainThread demo: _api.start() finished
2019-04-05 14:57:07,650 [DEBUG] Thread-1 demo: _callback_threadsafe
2019-04-05 14:57:07,650 [DEBUG] MainThread demo: _callback started
2019-04-05 14:57:08,652 [DEBUG] MainThread demo: _api.stop()
2019-04-05 14:57:08,652 [DEBUG] Thread-2 demo: _callback_threadsafe
Ah, so when you call self._api.stop()
, that delegates some worker thread to call the callback, but then self._api.stop()
blocks the main thread waiting for the worker thread to report that the callback has completed? Oof, yeah, that would indeed subvert Trio's attempts to protect you from deadlock!
So FYI, you left in some type hints there that reveal the identity of BadApi
– let me know if you need that stripped from the comment/comment history. This is very helpful for giving advice though :-). Assuming I have the right library, it looks like it actually provides two different methods for stopping the session. There's stop
:
Stop operation of this session and block until all callbacks to EventHandler objects relating to this Session which are currently in progress have completed (including the callback to handle the SESSION_STATUS Event with SessionTerminated message this call generates).
And then there's stopAsync
:
Begin the process to stop this Session and return immediately. The application must monitor events for a SESSION_STATUS Event with SessionTerminated message which will be generated once the Session has been stopped.
So it sounds like your problem is, right now your code is designed around the stopAsync
semantics, but you're calling stop
:-). So your options are:
If you keep using stop
, which is documented as a blocking operation, then you should push it off into a worker thread to avoid blocking the main Trio thread: await trio.run_sync_in_worker_thread(self._api.stop)
. In this case you can remove the _stopped
event, because the self._api.stop
call won't return until after the shutdown is complete.
OR, you should use stopAsync
, which is documented to return immediately, and then wait for the _stopped
event like you are now.
Those docs also describe a distinction between start
and startAsync
that you might want to look into.
Also I should probably mention that I'm also available as a consultant for help and advice, in case you'd find it easier to show me actual code under NDA...
No, no secrets its the Bloomberg API, but that code isn't useful when asking for help as people can't run it without a paid up license! That's why I attempted to strip out the distraction... the only problem is you've run ahead and read the docs (thanks!!!) but in my prod code I am calling Bloombergs stopAsync and it behaves as per my reproducible example. I was also surprised - but digging into the asyncio
wrapper I have it works the same, although asyncio
doesn't care because call_soon_threadsafe
returns immediately. My fault for not spending the time creating a minimal working example in the first instance.
So, I think the runnable toy example fully captures the problem, but maybe its too simple as it is deterministic. What I'm more concerned about is the general case. If an API will schedule the callback on another thread, we can't in general assume the triggering function will complete before the callback (or can we, correct me if I'm wrong, I would love to be wrong :) For example:
def bad_api_do_request():
self._internal_do_request(); # This triggers the callback on another thread at some time in the future
self._internal_do_bookeeping(); # What if this takes 1ms? 10ms? 100ms? 1000ms?
I might be missing the point, but even a 'non-blocking' function takes some time to complete, and during that time python may decide to schedule the other thread to do some work, so maybe we'll be unlucky and the callback will trigger.
I'm not worried about a single 'bad' function like my stop
example - it can be worked around. But it seems to hint there is something broken in the pattern being used to wrap this api in the general case.
Ok - TrioToken.run_sync_soon
was exactly what I was after. Apart from being a little hard to find (being in hazmat, which is understandable) the documentation was perfect. Below is a complete working example of wrapping a toy api that (I think) is representative of a real-life callback based API. What I really like is that any errors reliably crash the application. Despite my best efforts, I'm not confident my asyncio
wrapped implementations of similar api's do that...
I also like the way I ended up being forced to inject a Nursery
. I had been thinking it would make some sort of sense (suppose there were many different functions like request
, they should do all their work in the same nursery, and by injecting it the user of TrioApi
gets control) but I was going to 'do it later'. It's really nice that Trio forced me towards better design right from the beginning!
This example doesn't even consider cancellation, but if I update it I'll come back and add some edits. Hopefully someone finds this useful!
import random
import time
import trio
import trio.hazmat
import threading
import logging
log = logging.getLogger('demo')
class BadApi:
"""
__init__ with a callback that will be called at arbitrary times from arbitrary threads
start() must be called first, then AFTER callback('started') has been called,
further calls can be made
request(request_id, i) will cause
multiple callback('partial_response', id, i) followed by a
final callback('response', id, i)
stop() must be called, and then AFTER callback('stoppped') has been called we know all threads
have been killed. (failing to call stop causes app to hang because threads are not dead)
"""
def __init__(self, callback):
self._callback = callback
self._started = False
self.log = log.getChild('BadApi')
def start(self):
self.log.debug('start')
def _reply():
self.log.debug(f'_reply started')
self._started = True
self._callback('started')
self.log.info('started')
thread = threading.Thread(target=_reply)
thread.start()
def stop(self):
# This implementation has a nasty gotcha: it is async in the sense that the callback happens on another
# thread, but it just to happens that callback _always_ happens before this function returns
# This mimics a real api: it is often the case that an API might do things synchronously if it is able
self.log.debug('stop')
assert self._started
def _reply():
self.log.debug(f'_reply stopped')
self._callback('stopped')
self.log.info('stopped')
thread = threading.Thread(target=_reply)
thread.start()
thread.join() # require callback finishes first
def request(self, request_id, v):
self.log.debug(f'request {request_id} {v}')
assert isinstance(v, int)
assert self._started
def _reply():
for i in range(0, v):
self.log.debug(f'_reply partial_response {request_id} {i * v}')
self._callback('partial_response', request_id, i * v)
time.sleep(random.uniform(0.01, 1))
self._callback('response', request_id, v * v)
self.log.info(f'request {request_id} complete')
thread = threading.Thread(target=_reply)
thread.start()
self.log.info(f'request {request_id} pending')
class TrioApi:
def __init__(self):
self._api = BadApi(self._callback_threadsafe)
self._started = trio.Event()
self._stopped = trio.Event()
self._trio_token = None
self._nursery = None
self.log = log.getChild('TrioApi')
async def start(self, nursery):
self.log.debug('start')
self._nursery = nursery
self._trio_token = trio.hazmat.current_trio_token()
self._api.start() # self._callback_threadsafe will happen after this call returns.
await self._started.wait()
self.log.info('started')
async def stop(self):
self.log.debug('stop')
self._api.stop() # self._callback_threadsafe will happen before this call returns. Bad Api!
await self._stopped.wait()
self.log.info('stopped')
async def request(self, v):
self.log.debug(f'request {v}')
send_channel, receive_channel = trio.open_memory_channel(10)
def _callback(msg_type, r):
# This happens in a trio system task, so if we don't have the buffer,
# an exception will be raised and crash out of trio.run(...)
# We could think about back-pressure if we thought BadApi would be kind to a slow consumer,
# but it's not immediately obvious how we would create a blocking call to wait for the channel to be ready.
self.log.debug(f'request _callback sending {r}')
send_channel.send_nowait(r)
if msg_type == 'response':
self._nursery.start_soon(send_channel.aclose)
self.log.info('request complete')
self._api.request(_callback, v)
self.log.info('request pending')
# This looks like it would work, but doesn't, as async generators are 'too difficult'?
# Not sure - there are is a lot written about it in the issue tracker.
# To see the problem, uncomment the following code (and remove the return statement),
# and try raising an error in the client code (see the SIMULATE_ERROR global)
#
#async with receive_channel:
# async for r in receive_channel:
# yield r
return receive_channel
def _callback_threadsafe(self, *args):
# If we use the BlockingTrioPortal then this callback blocks if the user calls api.stop().
# That is because there is a deadlock using that method: trio cannot run the callback until BadApi.stop()
# returns, but BadApi.stop() won't return until the callback is run
#
# Using TrioToken.run_sync_soon solves that as the callback is placed in a queue and run_sync_soon returns
# immediately. This lets api.stop() return, and then trio picks up the callback up off the queue and creates
# a system task. Among other things, that means that errors are not propagated back to this function, instead
# any errors will crash trio.run(...)
#
# In this case, that is _exactly_ what we want! Because BadApi does not want to know about errors in this
# callback. It is probably going to drop them on the floor, or if we are lucky log them and continue, or maybe
# it will crash... this way, we are explicitly saying self._callback will handle all errors, or we expect the
# program to crash (so we are forced to fix the problem)
self.log.debug(f'_callback_threadsafe ...')
self._trio_token.run_sync_soon(self._callback, *args)
def _callback(self, msg_type, response_id=None, result=None):
# This is called in a system task. That means any bugs in our implementation will crash trio.run(...)
# This isn't terrible (it's better than the error being passed back to the api thread!) but it prints an
# error message like: "trio.TrioInternalError: internal error in trio - please file a bug!"... and people will
# get quite annoyed if we followed that advice!
#
# One idea would be execute this code in self._nursery. However, that alone doesn't buy us much, because after
# the error propagate to the nursery, the nursery is closed. The next message to come from the api (which will
# probably happen no matter how fast we try to stop it?) will hit a 'closed' nursery, which will cause an error
# in the system task anyway.
#
# I'm not sure it makes any sense to think about putting this in a nursery and handling cancellation:
# if there is a bug in this api, probably the only sensible thing is to crash quick-smart and fix the bug -
# it's not something that can be recovered from.
self.log.debug(f'_callback {msg_type} {response_id} {result}')
if msg_type == 'started':
self._started.set()
elif msg_type == 'stopped':
self._stopped.set()
else:
# This is 'partial_response' or 'response' and we made the id callable
# If BadApi forced us to use an int (for example) we would need a look-up
response_id(msg_type, result)
async def do_request(api, i):
receive_channel = await api.request(i)
async with receive_channel:
async for v in receive_channel:
if SIMULATE_ERROR and v == 15:
raise RuntimeError('Simulated error -- v == 15')
print(f'request({i}): {v}')
async def main():
api = TrioApi()
async with trio.open_nursery() as nursery:
await api.start(nursery)
print('Started!')
async with trio.open_nursery() as all_requests:
all_requests.start_soon(do_request, api, 3)
all_requests.start_soon(do_request, api, 4)
all_requests.start_soon(do_request, api, 5)
await api.stop()
print('Stopped!')
# Set SIMULATE_ERROR to True to see an error from the client handling of a request
# Set the logging level to DEBUG for more, or WARN for none.
SIMULATE_ERROR = False
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(threadName)s %(name)s: %(message)s")
trio.run(main)
It doesn't look like there's any action item left on this issue, so I'm going to close it. Glad you got a working solution, and thanks for posting it for the rest of us to look at! I'm leaving a link on #802 as a good example of the sorts of porting problems people are facing.
Below is a complete working example of wrapping a toy api that (I think) is representative of a real-life callback based API.
THANK YOU (both). I have been trying to find a workaround to this exact issue with the blpapi Python implementation for a very long time.
Hopefully someone finds this useful!
Yes. Very.
class BadApi:
Said 3rd party should consider renaming their Python library to this. The utter lack of any semblance of documentation has sadly not improved since this Issue was submitted 5 years ago...
This example doesn't even consider cancellation, but if I update it I'll come back and add some edits.
I would be extremely interested in any other tips or patterns you found when working through the above.
@0zeroth
Apologies for the lack of a reproducible example - I had trouble faking an API that was as badly behaved as the 3rd party API that I am stuck with! I'm hoping the following description will be sufficient to get some useful help. If not, I can chip a way at trying to reproduce the behaviour.
The
stop
function is badly behaved - it consistently causes the_callback_threadsafe
to be called before it returns. How do we handle this in trio? (a detailed explanation follows)The
start
function works as expected. The_api.start()
function returns, and then at a later time, on another thread our_callback_threadsafe
function is called and we schedule_callback
across to the trio thread which sets our event.The
stop
function is badly behaved - it consistently causes the_callback_threadsafe
to be called before it returns, and will not return until_callback_threadsafe
returns. So it decides to act synchronously despite being documented otherwise. As far as I can tell, this causes a deadlock because trio is waiting forApi.stop()
to finish, which is waiting forBadApi.stop()
, which is waiting for the_callback
to finish, which is waiting forBlockingTrioPortal.run_sync
to finish. Butrun_sync
cannot finish, because trio never gets the chance to schedule_callback
.Having the
BlockingTrioPortal
block (the clue is in the name!) is consistent with the trio philosophy of not dropping errors: if we didn't block, how would we know if an error had occured in_callback
? So, perhaps running the_callback
is actually a type of task, and should be handed off to a nursery. I tried that: give theTrioApi
a nursery object, and then use something like (pseudo-code, a littleasync
needs to sprinkled into theTrioApi
):This did not work for me - and at this point I'm not sure if I've just got some silly mistake, or am going about this completely the wrong way.
My intention is palm responsibility for errors onto the nursery.
call_soon
should return immediately, allowing the_portal.run_sync
to complete. Then the nursery can be configured to handle errors in a sensible way.To sum up: how can we schedule a call on the trio thread in a similar way to
asyncio
scall_soon_threadsafe
, but staying within the philosophy oftrio
?