danielgtaylor / python-betterproto

Clean, modern, Python 3.6+ code generator & library for Protobuf 3 and async gRPC
MIT License
1.51k stars 214 forks source link

[Feqture request] Add a way to produce server service implementation with using Stream directly #204

Open lambdalisue opened 3 years ago

lambdalisue commented 3 years ago

I'd love to use grpclib.server.Stream directly instead of request_iterator/yield combination to handle stream/steram gRPC server.

For example, the current way is a bit tricky to write consumer/producer style while yield must be placed directly under the function scope. So I need to use exception or some other tricky way to find if gRPC is closed or not.

import asyncio
from contextlib import suppress

from .proto import ServerBase

# Assume that the followings are inbound/outbound for proxing gRPC messages
inbound: asyncio.Queue[str] = asyncio.Queue()
outbound: asyncio.Queue[str] = asyncio.Queue()

class ServerWithIteratorAndGenerator(ServerBase):
    async def proxy(self, request_iterator: AsyncIterator[str]) -> AsyncIterator[str]:
        class ConsumerClosedError(Exception):
            pass

        async def consumer_handler() -> None:
            async for message in request_iterator:
                await outbound.put(message)
            raise ConsumerClosedError()

        consumer = asyncio.create_task(consumer_handler())

        with suppress(ConsumerClosedError):
            while True:
                producer = inbound.get()
                done, _ = await asyncio.wait(
                    [consumer, producer],
                    return_when=asyncio.FIRST_COMPLETED,
                )
                # NOTE:
                # If 'consumer' is closed (gRPC has closed), the code below raise ConsumerClosedError
                message = await next(done)
                if not message:
                    # inbound is closed.
                    break
                yield message

        # Make sure that consumer is closed
        with suppress(ConsumerClosedError):
            consumer.cancel()

If betterproto produce ServerBase with native grpclib.server.Stream, I could write above code as

import asyncio

from .proto import ServerBase

# Assume that the followings are inbound/outbound for proxing gRPC messages
inbound: asyncio.Queue[str] = asyncio.Queue()
outbound: asyncio.Queue[str] = asyncio.Queue()

class ServerWithNativeStream(ServerBase):
    async def proxy(self, stream: grpclib.server.Stream) -> None:
        async def consumer_handler() -> None:
            while True:
                message = await stream.recv_message()
                if message is None:
                    # gRPC is closed
                    break
                await outbound.put(message)

        async def producer_handler() -> None:
            while True:
                message = await inbound.get()
                if message is None:
                    # inbound is closed
                    break
                await stream.send_message(message)

        consumer = asyncio.create_task(consumer_handler())
        producer = asyncio.create_task(producer_handler())

        done, pending = await asyncio.wait(
            [consumer, producer],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()
        for task in done:
            task.result()

So above code does not have any tricky tech. to find if incoming stream is closed.

I know that I can use _SeverBase__rpc_proxy() method to overwrite it but I feel it's a bit redundant and ugly.

import asyncio

from .proto import ServerBase

# Assume that the followings are inbound/outbound for proxing gRPC messages
inbound: asyncio.Queue[str] = asyncio.Queue()
outbound: asyncio.Queue[str] = asyncio.Queue()

class Server(ServerBase):
    async def _ServerBase__rpc_proxy(self, stream: grpclib.server.Stream) -> None:
        async def consumer_handler() -> None:
            while True:
                message = await stream.recv_message()
                if message is None:
                    # gRPC is closed
                    break
                await outbound.put(message)

        async def producer_handler() -> None:
            while True:
                message = await inbound.get()
                if message is None:
                    # inbound is closed
                    break
                await stream.send_message(message)

        consumer = asyncio.create_task(consumer_handler())
        producer = asyncio.create_task(producer_handler())

        done, pending = await asyncio.wait(
            [consumer, producer],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()
        for task in done:
            task.result()
kho239 commented 11 months ago

Also, this way it will be possible for client to cancel the stream from server — grpclib has such functionality, namely await stream.cancel()