shabbyrobe / grpc-stubs

gRPC typing stubs for Python
MIT License
35 stars 21 forks source link

intercept_stream_stream in grpc.aio incorrectly marked as a coroutine #47

Open arvidfm opened 1 year ago

arvidfm commented 1 year ago

Description of issue

grpc.aio.StreamStreamClientInterceptor.intercept_stream_stream is marked as async in grpc-stubs:

https://github.com/shabbyrobe/grpc-stubs/blob/master/grpc-stubs/aio/__init__.pyi#L351

But grpc.aio (as of version 1.54.2) actually fails if you attempt to pass an interceptor with an async implementation.

The same likely applies to the equivalent methods in the other interceptor types.

Minimum Reproducible Example

The following passes strict type checking with mypy, but fails at runtime. Remove async from the intercept_stream_stream function signature to make it work.

main.py ```py import asyncio from collections.abc import AsyncIterator, Callable from typing import TYPE_CHECKING, cast, Union, AsyncIterable, Iterable import grpc.aio from grpc.aio import ClientCallDetails, StreamStreamCall import pow_pb2 import pow_pb2_grpc if TYPE_CHECKING: BaseInterceptor = grpc.aio.StreamStreamClientInterceptor[pow_pb2.Message, pow_pb2.Message] else: BaseInterceptor = grpc.aio.StreamStreamClientInterceptor class MyInterceptor(BaseInterceptor): async def intercept_stream_stream( self, continuation: Callable[ [ClientCallDetails, pow_pb2.Message], StreamStreamCall[pow_pb2.Message, pow_pb2.Message], ], client_call_details: ClientCallDetails, request_iterator: Union[AsyncIterable[pow_pb2.Message], Iterable[pow_pb2.Message]], ) -> Union[AsyncIterator[pow_pb2.Message], StreamStreamCall[pow_pb2.Message, pow_pb2.Message]]: print(client_call_details.method) # https://github.com/shabbyrobe/grpc-stubs/issues/46 return continuation(client_call_details, request_iterator) # type: ignore[arg-type] async def streamer() -> AsyncIterator[pow_pb2.Message]: for i in range(10): yield pow_pb2.Message(Number=i) class PowerServicer(pow_pb2_grpc.PowerServicer): async def Pow( self, request_iterator: AsyncIterable[pow_pb2.Message], context: grpc.aio.ServicerContext[pow_pb2.Message, pow_pb2.Message], ) -> AsyncIterator[pow_pb2.Message]: async for i in request_iterator: yield pow_pb2.Message(Number=i.Number**2) async def start_server() -> None: server = grpc.aio.server() pow_pb2_grpc.add_PowerServicer_to_server(PowerServicer(), server) server.add_insecure_port("[::]:50051") await server.start() await server.wait_for_termination() async def call_server() -> None: channel = grpc.aio.insecure_channel( "localhost:50051", interceptors=[cast(grpc.aio.ClientInterceptor, MyInterceptor())], ) stub = pow_pb2_grpc.PowerStub(channel) if TYPE_CHECKING: async_stub = cast(pow_pb2_grpc.PowerAsyncStub, stub) else: async_stub = stub result: pow_pb2.Message async for result in async_stub.Pow(streamer()): print(result.Number) async def main() -> None: asyncio.create_task(start_server()) await asyncio.sleep(1) await call_server() if __name__ == "__main__": asyncio.run(main()) ```
pow.proto ```protobuf syntax = "proto3"; service Power { rpc Pow(stream Message) returns (stream Message) {}; } message Message { float Number = 1; } ```
run.sh ```sh #!/usr/bin/env bash set -o errexit -o nounset -o pipefail python -m venv venv source ./venv/bin/activate pip install grpcio==1.54.2 grpcio-tools==1.54.2 mypy==1.3.0 git+https://github.com/shabbyrobe/grpc-stubs.git git+https://github.com/nipunn1313/mypy-protobuf.git python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. --mypy_out=. --mypy_grpc_out=. pow.proto python main.py ```
shabbyrobe commented 1 year ago

Thank you for supplying an MRE. I was able to reproduce a failure locally with your example. Both the documentation and the source mark these methods as async though... I'm not really sure why.

https://grpc.github.io/grpc/python/grpc_asyncio.html#grpc.aio.StreamStreamClientInterceptor.intercept_stream_stream https://grpc.github.io/grpc/python/_modules/grpc/aio/_interceptor.html#StreamStreamClientInterceptor.intercept_stream_stream

My apologies, but, I'm too far removed from both async python and gRPC at the moment to be able to adequately respond to this issue with the info I have, and given I've had quite a lot of trouble with contributions to these aio typings so far, I think I'll need a little more convincing about the right course of action here.

macro1 commented 5 months ago

hi @arvidfm. i think you're missing an await in your intercept_stream_stream()

maybe something more like this:

class MyInterceptor(BaseInterceptor):

    async def intercept_stream_stream(
        self,
        continuation: Callable[  # type: ignore[override]
            [ClientCallDetails, AsyncIterator[pow_pb2.Message]],
            Awaitable[StreamStreamCall[pow_pb2.Message, pow_pb2.Message]],
        ],
        client_call_details: ClientCallDetails,
        request_iterator: AsyncIterator[pow_pb2.Message],  # type: ignore[override]
    ) -> Union[
        AsyncIterator[pow_pb2.Message],
        StreamStreamCall[pow_pb2.Message, pow_pb2.Message],
    ]:
        return await continuation(client_call_details, request_iterator)

which is similar to how code is written in grpc's tests: https://github.com/grpc/grpc/blob/3fe06af9a305ff6df97e9b936a9a94fc4350526c/src/python/grpcio_tests/tests_aio/unit/client_stream_stream_interceptor_test.py#L36

i'm not sure about the iterable vs iterator stuff.. clearly streams would be iterators, as the messages are yielded. seems like a different issue than you're reporting though