googleapis / python-pubsub

Apache License 2.0
392 stars 205 forks source link

Async subscriber should raise RuntimeError outside event loop, streaming pull fails #1279

Closed mbrancato closed 2 weeks ago

mbrancato commented 1 month ago

When using google.pubsub_v1.SubscriberAsyncClient, awaiting the coroutine method streaming_pull may hang forever with no indication as to why. This is due to creating the SubscriberAsyncClient outside an existing event loop / async function.

Note: I'm only able to produce this on MacOS.

Environment details

Steps to reproduce

  1. Create a SubscriberAsyncClient outside an event loop
  2. Call subscriber.streaming_pull()

Code example

To make this "work", move the subscriber inside the main async function.

import asyncio
import logging

from google.api_core.retry import AsyncRetry
from google.pubsub_v1 import SubscriberAsyncClient, StreamingPullRequest

logging.basicConfig(level=logging.DEBUG)
subscriber = SubscriberAsyncClient()

async def main():
    subscription_path = subscriber.subscription_path("fake-project", "fake-subscription")
    requests = [StreamingPullRequest(subscription=subscription_path)]

    async def _requests():
        for request in requests:
            yield request

    retry = AsyncRetry(timeout=5)
    stream = await asyncio.wait_for(
        subscriber.streaming_pull(requests=_requests(), retry=retry), timeout=10
    )
    async for response in stream:
        print(response)

if __name__ == "__main__":
    asyncio.run(main())

Stack trace

% python stream_async.py      
DEBUG:google.auth._default:Checking None for explicit credentials as part of auth process...
DEBUG:google.auth._default:Checking Cloud SDK credentials as part of auth process...
DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:grpc._cython.cygrpc:Using AsyncIOEngine.POLLER as I/O engine
DEBUG:asyncio:Using selector: KqueueSelector
Traceback (most recent call last):
  File "/Users/mike/.pyenv/versions/3.11.10/lib/python3.11/asyncio/tasks.py", line 500, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
  File "/Users/mike/.pyenv/versions/global-3.11/lib/python3.11/site-packages/google/api_core/retry/retry_unary_async.py", line 230, in retry_wrapped_func
    return await retry_target(
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/mike/.pyenv/versions/global-3.11/lib/python3.11/site-packages/google/api_core/retry/retry_unary_async.py", line 155, in retry_target
    return await target()
           ^^^^^^^^^^^^^^
  File "/Users/mike/.pyenv/versions/global-3.11/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 178, in error_remapped_callable
    await call.wait_for_connection()
  File "/Users/mike/.pyenv/versions/global-3.11/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 77, in wait_for_connection
    await self._call.wait_for_connection()
  File "/Users/mike/.pyenv/versions/global-3.11/lib/python3.11/site-packages/grpc/aio/_call.py", line 537, in wait_for_connection
    await self._metadata_sent.wait()
  File "/Users/mike/.pyenv/versions/3.11.10/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/mike/Documents/Code/detection/tests/stream_async.py", line 28, in <module>
    asyncio.run(main())
  File "/Users/mike/.pyenv/versions/3.11.10/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/mike/.pyenv/versions/3.11.10/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mike/.pyenv/versions/3.11.10/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/mike/Documents/Code/detection/tests/stream_async.py", line 20, in main
    stream = await asyncio.wait_for(
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mike/.pyenv/versions/3.11.10/lib/python3.11/asyncio/tasks.py", line 502, in wait_for
    raise exceptions.TimeoutError() from exc
TimeoutError

Making sure to follow these steps will guarantee the quickest resolution possible.

Thanks!

mukund-ananthu commented 2 weeks ago

mbrancato The recommended approach to use the library to subscribe to messages is via the subscriber client in the google/cloud/pubsub_v1 code path as opposed to using google/pubsub_v1/ code path. To clarify, is there a reason / end goal that you are trying to get to via this pubsub_v1 code path instead of cloud/pubsub_v1 code path?

mukund-ananthu commented 2 weeks ago

Closing since there isn't a response, but feel free to reopen if required.