shabbyrobe / grpc-stubs

gRPC typing stubs for Python
MIT License
35 stars 21 forks source link

Add stubs for grpc.aio #13

Closed ColdrickSotK closed 3 years ago

ColdrickSotK commented 3 years ago

The grpc.aio module has good type annotation, but without these stubs existing mypy falls over when using it whilst the gRPC stubs in this repository are installed.

The stubs themselves are autogenerated with stubgen and tidied up by hand.

shabbyrobe commented 3 years ago

Looks good. Thanks for the PR! Would you mind providing a few representative, anonymised snippets of code where you're using this module?

ColdrickSotK commented 3 years ago

Sorry for the delay in replying. We're currently only using the async channel constructors and client-side interceptors, and still using the synchronous code already covered for our server-side implementation.

This is pretty representative of the extent of our use currently, and reproduces the issue solved by this PR.

from grpc import aio

class ExampleInterceptor(aio.UnaryUnaryClientInterceptor):

    def __init__(self):
        self._metadata = ("example_header", "example_value")

    def _add_metadata(self, call_details):
        if call_details.metadata is not None:
            new_metadata = list(call_details.metadata)
        else:
            new_metadata = []

        new_metadata.append(self._metadata)
        return aio.ClientCallDetails(
            call_details.method,
            call_details.timeout,
            new_metadata,
            call_details.credentials,
            call_details.wait_for_ready
        )

    async def intercept_unary_unary(self, continuation, client_call_details, request):
        new_details = self._add_metadata(client_call_details)
        return await continuation(new_details, request)

channel = aio.insecure_channel("localhost:50051", interceptors=[ExampleInterceptor()])

# Do some gRPC requests