googleapis / python-pubsub

Apache License 2.0
392 stars 206 forks source link

Asyncio coroutines for common tasks #389

Open mbrancato opened 3 years ago

mbrancato commented 3 years ago

The current status of asyncio has mostly been answered here already: https://github.com/googleapis/python-pubsub/issues/218#issuecomment-760778351

This feature request is to ask for a formal exposure of high-level awaitable methods for interacting with the PubSub library and reducing the use of threads. Although, if the library still wants to manage threads, that seems fine as long as how we use the library would be awaitable and the callback are run in the same event loop. I don't want to be too prescriptive on the implementation in my request.

Some examples, assuming SubscriberClient and PublisherClient were already setup...

example subscribe:

async def my_func(msg):
    # ... do something
    pass

subscription_path = f"projects/{project_id}/subscriptions/{subscription}"
streaming_pull_async = subscriber.subscribe_async(subscription_path, callback=my_func,)

try:
    await streaming_pull_async.result(timeout=pull_timeout)
except TimeoutError:
    # Handle flow timeout
    pass

example publish:

topic_path = f"projects/{project_id}/topics/{output_topic}"
publish_value_async = publisher.publish_async(topic_path, value)
await publish_value_async.result()
plamut commented 3 years ago

@mbrancato So essentially the request is to offer the same functionality the existing (sync) client offers, but in async/await flavor?

I'll forward it, but I suspect this will require substantial amount of work to match the current client's features.

mbrancato commented 3 years ago

That's basically it @plamut. It would prevent needing to wrap some of the pubsub calls in things like run_in_executor() and wrapping the callback.

In case anyone needs a solution, a good example of what works now is here: https://github.com/allenporter/python-google-nest-sdm/blob/cb2dc1bb5c61284e6f489f9e8933dfa758645196/google_nest_sdm/google_nest_subscriber.py#L49

plamut commented 3 years ago

@mbrancato I brought this up at a meeting, and it's quite a common request in other client libraries, too.

This feature is being considered, but it's currently in exploratory phase, e.g. how to approach not to duplicate the hand-written logic too much, etc., as the amount of work required can be substantial.

As such, there's no ETA at the moment, but I'll keep this request open for visibility, and for any future updates.

vikahl commented 2 years ago

This feature is being considered, but it's currently in exploratory phase, e.g. how to approach not to duplicate the hand-written logic too much, etc., as the amount of work required can be substantial.

I understand that there is no ETA on this specific feature, but has there been any thoughts/plan about how approach the problem in general?

acocuzzo commented 2 years ago

We do want to support keeping up with new language features like async/await. However, we're also careful to keep the API surface similar across languages, so we need to co-ordinate between languages to do this. This is something we will look at for a 2.0 API.

geosach commented 2 years ago

Hello, until a v2.0 API is implemented, is it possible to provide some examples on how to combine the library with projects that use other async libraries? For instance, in our FastAPI based project we need to implement consumers that store data on a mongo db database using an async mongo library (AsyncIOMotorClient). I believe that this is a common use case.

Is this something that can be implemented in a thread safe way, using asyncio and functions like run_until_complete? We can find some workarounds but we don't feel confident that these workarounds are safe since they combine asyncio with the internal threading mechanism of the library. Do you have any recommendations or examples?

adriangb commented 2 years ago

+1 for this feature request

samskiter commented 1 year ago

As there is now a v2.0 version of this library, is an Async-flavour interface now implemented? The documentation on 2.0 is very thin on the ground...

NickNaskida commented 11 months ago

Almost end of 2023 but still no updates 😢

+999 for a feature

@plamut

plamut commented 11 months ago

@NickNaskida I am not a maintainer of this library anymore (since the start of 2022), and thus cannot say much about its current state or its roadmap, I'm afraid.

mbrancato commented 11 months ago

@NickNaskida This may be helpful, but here is an example of using the async publisher. It may not be supported by Google to use these directly.

import asyncio
import logging
from typing import MutableSequence

from google.pubsub_v1 import PubsubMessage
from google.pubsub_v1.services.publisher.async_client import PublisherAsyncClient
from google.pubsub_v1.services.publisher.transports import PublisherGrpcAsyncIOTransport
from grpc import ChannelConnectivity

async def run():
    publisher_client = PublisherAsyncClient(transport="grpc_asyncio")
    transport = publisher_client.transport
    if isinstance(transport, PublisherGrpcAsyncIOTransport):
        await transport.grpc_channel.channel_ready()
        chan = transport.grpc_channel.get_state(try_to_connect=True)
        if chan != ChannelConnectivity.READY:
            logging.error("Channel is not ready")
            return

    messages = [
        PubsubMessage(
            {
                "data": b"FOO",
                "attributes": {
                    "message_type": "test",
                    "customer": "acme",
                },
            }
        ),
        PubsubMessage(
            {
                "data": b"BAR",
                "attributes": {
                    "message_type": "test",
                    "customer": "acme",
                },
            }
        ),
    ]

    if await publish_pubsub(messages, publisher_client):
        logging.info("Published messages")

    await publisher_client.transport.close()

async def publish_pubsub(
    messages: MutableSequence[PubsubMessage],
    client: PublisherAsyncClient,
) -> bool:
    topic = client.topic_path("my_project", "my_topic")

    try:
        resp = await client.publish(topic=topic, messages=messages)
        if len(resp.message_ids) != len(messages):
            logging.error("Failed to publish some messages to Pub/Sub")
            return False
    except asyncio.TimeoutError:
        logging.warning("Timeout pushing event to Pub/Sub")
        return False
    else:
        return True
steve-marmalade commented 7 months ago

In case it's helpful to anyone else, I ultimately decided to use the synchronous PublisherClient in my FastAPI service and convert the future to an awaitable via asyncio.wrap_future(future). I think this is the easiest way to use GCP Pub/Sub in a Python async environment.