danielgtaylor / python-betterproto

Clean, modern, Python 3.6+ code generator & library for Protobuf 3 and async gRPC
MIT License
1.51k stars 214 forks source link

AsyncChannel produces unexpected exception on cancel #233

Open SoerenBusse opened 3 years ago

SoerenBusse commented 3 years ago

Hey there,

I'm using the AsyncChannel to send messages to a GRPC server. When receiving a grpc error the client prints some non-catched errors of the sending task in the console (related to: https://github.com/danielgtaylor/python-betterproto/issues/188).

future: <Task finished name='Task-2' coro=<ServiceStub._send_messages() done, defined at /home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/grpclib_client.py:163> exception=ValueError('task_done() called too many times')>
Traceback (most recent call last):
  File "/home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/util/async_channel.py", line 87, in __anext__
    result = await self._queue.get()
  File "/usr/lib/python3.8/asyncio/queues.py", line 163, in get
    await getter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/grpclib_client.py", line 166, in _send_messages
    async for message in messages:
  File "/home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/util/async_channel.py", line 93, in __anext__
    self._queue.task_done()
  File "/usr/lib/python3.8/asyncio/queues.py", line 206, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

When the server sends a grpc-error the error is handled in this except-block, which cancels the sending task: https://github.com/danielgtaylor/python-betterproto/blob/master/src/betterproto/grpc/grpclib_client.py#L159-L161

This cancel-method results in throwing a CancelledError inside the current await of the sending task, which is the get-method on the queue of the AsyncChannel: https://github.com/danielgtaylor/python-betterproto/blob/master/src/betterproto/grpc/util/async_channel.py#L87

In this case the get-method is raising a CancelledError even if the queue is empty. If this happens the task_done-method is called on an empty queue, and the above error message is shown.

A quick fix would be the following, however I haven't checked if this fix has any side effects:

try:
    ...
finally:
    self._waiting_receivers -= 1
    if not self._queue.empty():
        self._queue.task_done()
nat-n commented 3 years ago

Thanks for the detailed report.

So the problem is that we call task_done based on the assumption that we got an item from the queue, but there might not have even been an item on the queue to get in the scenario you describe?

I haven't got time to dive into this properly right now, but off the top of my head I'm wondering does this mean there are probably two problems to solve? :

  1. (L93](https://github.com/danielgtaylor/python-betterproto/blob/master/src/betterproto/grpc/util/async_channel.py#L93) calls task done event
  2. that asyncio.exceptions.CancelledError isn't gracefully handled anywhere (should it be?)

I assume you tried your suggested fix and then it worked fine suggesting 2. isn't an issue?

At any rate, I'm not sure the solution you suggest is quite correct, because if my understanding of the error scenario is correct then the problem isn't that the queue is empty now because that is expected when you successfully get the last item from the queue, but rather that it might have never had anything added at all (or not since the last successful get)!

So maybe the correct solution is to refactor along the lines of:

        self._waiting_receivers += 1
        result = await self._queue.get()
        self._waiting_receivers -= 1
        self._queue.task_done()
        if result is self.__flush:
            return None
        return result

though I'd have to verify that this doesn't break anything. Maybe it's better to instead explicitly check whether an item was got from the queue.

It would be great to have a PR with a test case for this if you're up for it?