pytest-dev / pytest-asyncio

Asyncio support for pytest
https://pytest-asyncio.readthedocs.io
Apache License 2.0
1.43k stars 152 forks source link

gRPC error when using pytest, works with unittest #948

Closed mbrancato closed 1 month ago

mbrancato commented 1 month ago

When running an isolated asyncio test case under pytest, I get the following error which does not show up with using unittest:

    async for response in stream:
  File "/Users/mike/.pyenv/versions/3.11.9/envs/global-3.11/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 109, in _wrapped_aiter
    raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.InternalServerError: 500 Internal error from Core

Here is some package info:

Name: pytest
Version: 8.3.3
Summary: pytest: simple powerful testing with Python
Home-page: https://docs.pytest.org/en/latest/
Author: Holger Krekel, Bruno Oliveira, Ronny Pfannschmidt, Floris Bruynooghe, Brianna Laugher, Florian Bruhin, Others (See AUTHORS)
Author-email: 
License: MIT
Location: /Users/mike/.pyenv/versions/3.11.9/envs/global-3.11/lib/python3.11/site-packages
Requires: iniconfig, packaging, pluggy
Required-by: pytest-asyncio, pytest-cov, pytest-mock-resources, pytest-xdist
---
Name: pytest-asyncio
Version: 0.24.0
Summary: Pytest support for asyncio
Home-page: https://github.com/pytest-dev/pytest-asyncio
Author: Tin Tvrtković <tinchester@gmail.com>
Author-email: tinchester@gmail.com
License: Apache 2.0
Location: /Users/mike/.pyenv/versions/3.11.9/envs/global-3.11/lib/python3.11/site-packages
Requires: pytest
Required-by: 

pytest.ini:

[pytest]
asyncio_mode=auto

Running the following code will fail with and RPC error 500 Internal error from Core when run using pytest:

import os
import sys
import traceback
from unittest import IsolatedAsyncioTestCase

from testcontainers.google import PubSubContainer

class TestPubSubService(IsolatedAsyncioTestCase):
    async def test_stuff(self) -> None:
        test_project_id = "test-project"

        pubsub_emulator = PubSubContainer(project=test_project_id)
        print("Starting PubSub emulator", flush=True, file=sys.stderr)
        try:
            pubsub_emulator.start()
            host = pubsub_emulator.get_pubsub_emulator_host()
            os.environ["PUBSUB_EMULATOR_HOST"] = host
            os.environ["PUBSUB_PROJECT_ID"] = test_project_id

            from google.auth import credentials
            from google.pubsub_v1 import SubscriberAsyncClient, StreamingPullRequest
            from google.pubsub_v1.services.subscriber.transports import (
                SubscriberGrpcAsyncIOTransport,
            )

            pubsub_config = {
                "client_options": {"api_endpoint": pubsub_emulator.get_pubsub_emulator_host()},
                "credentials": credentials.AnonymousCredentials(),
            }

            subscriber_client = SubscriberAsyncClient(**pubsub_config)
            transport = subscriber_client.transport
            if isinstance(transport, SubscriberGrpcAsyncIOTransport):
                await transport.grpc_channel.channel_ready()

            subscription_path = f"projects/{test_project_id}/subscriptions/subscription_value"

            def request_generator():
                while True:
                    yield StreamingPullRequest(
                        subscription=subscription_path,
                        stream_ack_deadline_seconds=120,
                    )

            try:
                stream = await subscriber_client.streaming_pull(requests=request_generator())
            except Exception as e:
                print(traceback.format_exc(), flush=True, file=sys.stderr)
                raise e

            print("Listening for messages on subscription", flush=True)

            try:
                async for response in stream:
                    for received_message in response.received_messages:
                        print(received_message.message)
            except Exception as e:
                print(traceback.format_exc(), flush=True, file=sys.stderr)
                raise e
        finally:
            pubsub_emulator.stop()

So it fails with:

pytest test_foo.py

But it passes with:

python -m unittest test_foo.py

It also fails if I mark it async and do not use unittest at all:

@pytest.mark.asyncio
async def test_stuff():
  ...
mbrancato commented 1 month ago

closing as this seems to be a race condition in the gRPC handling / lack of documentation on how to use the request generator.