googleapis / python-pubsub

Apache License 2.0
392 stars 206 forks source link

Async Streaming Pull Usage #1174

Open ash2703 opened 5 months ago

ash2703 commented 5 months ago

Has someone used this in peoduction scenarios? I am stuck and unable to pull messages using the async streaming client and any help would be benefecial.

Is wrapping the standard streaming pull in an asyncio executor gonna give me the same behaviour as below client?

https://github.com/googleapis/python-pubsub/blob/ff229a5fdd4deaff0ac97c74f313d04b62720ff7/google/pubsub_v1/services/subscriber/async_client.py#L1368

This is my current usage:

from google import pubsub_v1
subscriber = pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber client
async def fetch_and_decode_msgs(self):
    try:
        print(f"Listening for messages on {self.subscription_path}...")
        request = pubsub_v1.StreamingPullRequest(
            subscription=self.subscription_path,
        )

        # Not sure if there is any other way to do this
        async def request_generator():
            yield request
        print(f"stream: {request}"). # Code gets blocked here

        stream = await self.subscriber.streaming_pull(requests=request_generator())
        print(f"stream: {stream}")
        # Handle the response
        async for response in stream:
            for received_message in response.received_messages:
                print("Received message: ", received_message.message.data.decode('utf-8'))
    except Exception as e:
        raise e
sudheer-ag commented 5 months ago

Client unable to pull messages because the underlying grpc library closing the stream immediately after opening the stream. https://github.com/grpc/grpc/blob/e4daabc8bcc2a72652a843f1aeabd58eec9331b5/src/python/grpcio/grpc/aio/_call.py#L476 This is due to the single request we send in the request generator. We can solve the issue by sending heartbeat request to the server

from google import pubsub_v1
class AsyncStreamingRequestIterator():

    def __init__(self, initial_request):
        self.request = initial_request

    def __aiter__(self):
        return self

    async def __anext__(self):
        # Send First Request
        if self.request:
            return_value = self.request
            self.request = None
            return return_value
        await asyncio.sleep(30) # Default 30 Seconds
        return StreamingPullRequest(stream_ack_deadline_seconds=900)

subscriber = pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber client
async def fetch_and_decode_msgs(self):
    try:
        print(f"Listening for messages on {self.subscription_path}...")
        request = pubsub_v1.StreamingPullRequest(
            subscription=self.subscription_path,
        )

        stream = await self.subscriber.streaming_pull(requests=AsyncStreamingRequestIterator(request), timeout=None)
        print(f"stream: {stream}")
        # Handle the response
        async for response in stream:
            for received_message in response.received_messages:
                print("Received message: ", received_message.message.data.decode('utf-8'))
    except Exception as e:
        raise e