python-trio / purerpc

Native, async Python gRPC client and server implementation supporting asyncio, uvloop, and trio
Apache License 2.0
217 stars 15 forks source link

ClientStubStreamStream uses yield inside a task group #36

Closed VincentVanlaer closed 1 year ago

VincentVanlaer commented 1 year ago

See https://github.com/python-trio/purerpc/blob/9b70109b02807b75346f8f6d331316963dd363c8/src/purerpc/wrappers.py#L110-L115

This causes internal corruption in trio.

I think there are two potential solutions:

class ClientStubStreamStream(ClientStub): async def call_aiter(self, message_aiter, metadata):

If send_multiple is cancelled while trying to send a message, that message will be saved in this variable

    current_message = NO_MESSAGE_WAITING

    async def send_multiple():
        nonlocal current_message

        while True:
            try:
                # Check if we got cancelled while trying to send a message
                if current_message is NO_MESSAGE_WAITING:
                    current_message = await message_aiter.__anext__()
            except StopAsyncIteration:
                return
            await stream.send_message(current_message)  # This must be cancel safe
            current_message = NO_MESSAGE_WAITING

    stream = await self._stream_fn(metadata=metadata)
    async with aclosing(message_aiter) as message_aiter:
        try:
            while True:
                async with anyio.create_task_group() as task_group:
                    task_group.start_soon(send_multiple)
                    try:
                        value = await stream_to_async_iterator(stream).__anext__()
                    except StopAsyncIteration:
                        return
                    task_group.cancel_scope.cancel()
                # Outside the task group
                yield value
        finally:
            stream.close()

* Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.

I prefer the second option as it is significantly cleaner and has less sharp edges regarding cancel safety.
belm0 commented 1 year ago

Thank you. I wonder why my project hasn't hit this. Do you have an example that demonstrates it?

VincentVanlaer commented 1 year ago

Adding the following to tests/test_errors.py triggers the bug for me:

@purerpc_channel("port")
async def test_errors_purerpc_in_client(greeter_pb2, greeter_grpc, channel):
    async def generator():
        for _ in range(7):
            yield greeter_pb2.HelloRequest()

    stub = greeter_grpc.GreeterStub(channel)

    async for resp in stub.SayHelloToMany(generator()):
        if resp.message == "2":
            raise ValueError("oops")
belm0 commented 1 year ago

Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.

purerpc has few users and, by way of no bug reports about calling the stub with an async generator in 5 years, it doesn't seem anyone is using the feature. Therefore, I'm not so concerned about an API change.

However, It's not trivial to use trio-util, because purerpc is built on anyio. (I don't have plans for an anyio variation of trio-util due to lack of sane test support-- namely, missing autojump_clock.)

It seems like the async generator support is only a convenience of the API. Isn't it possible for the API user to achieve the equivalent, given only the standard stub call? If so, I propose removing the feature from the API, considering that this project is in "maintenance mode".

VincentVanlaer commented 1 year ago

Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.

purerpc has few users and, by way of no bug reports about calling the stub with an async generator in 5 years, it doesn't seem anyone is using the feature. Therefore, I'm not so concerned about an API change.

However, It's not trivial to use trio-util, because purerpc is built on anyio. (I don't have plans for an anyio variation of trio-util due to lack of sane test support-- namely, missing autojump_clock.)

I don't think this needs a full wrapper like trio-util provides. I can't test it now, but the following should work:

async def stream_stream_sender(stream, message_aiter):
    async with aclosing(message_aiter) as message_aiter:
        async for message in message_aiter:
            await stream.send_message(message)

async def stream_stream_receiver(stream):
    stream = stream_to_async_iterator(stream)

    async for message in stream:
        yield message

@asynccontextmanager
async def stream_stream(self, message_aiter, metadata):
    stream = await self._stream_fn(metadata=metadata)

    async with anyio.create_task_group() as task_group:
        task_group.start_soon(stream_stream_sender, stream, message_aiter)
        yield stream_stream_receiver(stream)

You would then need to use it as

async with some_stub.StreamStreamCall(some_generator()) as resp_gen:
    async for resp in resp_gen:
        ...

It seems like the async generator support is only a convenience of the API. Isn't it possible for the API user to achieve the equivalent, given only the standard stub call? If so, I propose removing the feature from the API, considering that this project is in "maintenance mode".

If I understand it correctly, the generators are the only way to use stream-stream calls in the current purerpc api. You'd have to replicate the logic in https://github.com/python-trio/purerpc/blob/9e96b7d6d5436d4b196c1cece208f907fee9c3a9/src/purerpc/client.py#L44-L66 with a custom ClientStubStreamStream implementation.

About the maintenance mode, I was planning to submit some PRs with some improvements (adding typing information to the stubs, unix socket support, proper handling of connection failures). I could let those changes live in a fork, but that seems needless fragmentation to me. Unless I missed something in my search, I don't think there is an alternative to using grpc with trio? I'm happy to help maintain this project a bit, but would still need someone to review the PRs (given the don't merge your own PR rules).

belm0 commented 1 year ago

the generators are the only way to use stream-stream calls in the current purerpc api

I don't think this needs a full wrapper like trio-util provides. I can't test it now, but the following should work:

I see. If you have time to work on a PR, I'm happy to review. It would be important to add unit test coverage for the new stub method.

About other improvements, I'm happy to review, but I'd like to consider carefully whether the changes will add to the maintenance burden. In other projects, I've found even things like adding type annotations to be a burden, because of added dependencies on projects with alpha stability, and having to battle the type system with every future PR.