Closed Skully17 closed 1 year ago
awesome, thanks! feedback rgd the PR: as hinted in https://crossbar.discourse.group/t/rapid-cancelling-of-tasks-can-cause-invalidstateerror/2128, I think it would be better to guard not by catching InvalidStateError, but prevent from running into that by checking if there is indeed a call that was canceled
More like this?
if asyncio.isfuture(on_reply) and on_reply.done():
self.log.warn("call cannot be processed as it has been cancelled")
# above might already have rejected, so we guard ..
elif enc_err:
txaio.reject(on_reply, enc_err)
else:
if msg.kwargs or (call_request.options and call_request.options.details):
kwargs = msg.kwargs or {}
if msg.args:
res = types.CallResult(*msg.args,
callee=msg.callee,
callee_authid=msg.callee_authid,
callee_authrole=msg.callee_authrole,
forward_for=msg.forward_for,
**kwargs)
else:
res = types.CallResult(callee=msg.callee,
callee_authid=msg.callee_authid,
callee_authrole=msg.callee_authrole,
forward_for=msg.forward_for,
**kwargs)
txaio.resolve(on_reply, res)
else:
if msg.args:
if len(msg.args) > 1:
res = types.CallResult(*msg.args)
txaio.resolve(on_reply, res)
else:
txaio.resolve(on_reply, msg.args[0])
else:
txaio.resolve(on_reply, None)
I had something like this first but changed to try/except to get the error message
tldr;
not quite, what I meant was: check if the on_reply future is already canceled, and hence the result should be ignored without raising. sth along:
if not txaio.is_called(on_reply):
txaio.resolve(on_reply, res)
https://github.com/crossbario/autobahn-python/blob/master/autobahn/wamp/protocol.py#L946
this assumes that a future that is canceled counts as called. on both twisted and asyncio (when wrapped via txaio)
long answer: I had a deeper look now ... for one, you can't use asyncio.exceptions.InvalidStateError
since Autobahn supports both asyncio and Twisted, and the protocol.py file contains the network framework independent WAMP client impl.
starting with a call from client, this begins with:
a new call request is stored client side here
and the associated client side future that will be resolved or rejected later is defined as
def canceller(d):
cancel_msg = message.Cancel(request_id)
self._transport.send(cancel_msg)
# since we announced support for cancelling, we should
# definitely get an Error back for our Cancel which will
# clean up this invocation
on_reply = txaio.create_future(canceller=canceller)
on the result receiving side in the client, this starts here
we need to think about race conditions related to
msg.request in self._call_reqs
msg.progress
txaio.is_called(on_reply)
for one, you can't use
asyncio.exceptions.InvalidStateError
Ah, thanks for pointing that out. I didn't realise txaio was capable of checking for futures and if they have been cancelled. I have updated the PR and removed all references to asyncio I made and replaced them with equivalent txaio calls.
I noticed when following the code through that, although unlikely, this can error could happen for the other message types:
I modified my script, that can recreate this error, to subscription calls instead of RPC calls and it has the same problem. Should I apply the same fix to those messages as well?
I have extended the checks to be done in every message type which has a txaio.resolve call in it. Please let me know if I should do anything differently or if I have missed something.
thanks for the changes: yes, looks good now!
sorry for slow response;)
wrapped txaio.resolve() calls in try/except to catch InvalidStateErrors