googleapis / python-pubsub

Apache License 2.0
386 stars 201 forks source link

Python Pubsub Subscriber client not pulling messages when filter is applied on subscription #1026

Open ff-sdesai opened 10 months ago

ff-sdesai commented 10 months ago

Thanks for stopping by to let us know something could be better!

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response. I am using Pubsub streaming pull subscription in my Python web application. When I am not applying any subscription filter, the subscriber client is able to successfully pull messages from the subscription. However, if a subscription filter is applied, the subscriber stops pulling messages.If I go manually to the specific subscription and click on 'Pull', I can see that there are messages in the subscription (which obviously matched the filter criteria and hence are present within subscription). But the client can not pull any of these messages. Do I need to do any additional configuration for the client? The code for my subscriber client is as follows:-

import os

    from google.cloud import pubsub_v1
    from app.services.subscription_service import save_bill_events
    from app.utils.constants import BILL_SUBSCRIPTION_GCP_PROJECT_ID, BILL_EVENT_SUBSCRIPTION_ID
    from app.utils.logging_tracing_manager import get_logger

    logger = get_logger(__file__)

    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        save_bill_events(message.data)
        message.ack()

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(os.environ.get(BILL_SUBSCRIPTION_GCP_PROJECT_ID),
                                                     BILL_EVENT_SUBSCRIPTION_ID)

    # Limit the subscriber to only have fixed number of  outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=50)
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)

    async def poll_bill_subscription():

        with subscriber:
            try:
                # When `timeout` is not set, result() will block indefinitely,
                # unless an exception is encountered first.

                streaming_pull_future.result()
            except Exception as e:
                # Even in case of an exception, subscriber should keep listening
                logger.error(
                    f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}",
                    exc_info=True)

                pass

The subscription filter applied on subscription is as follows- attributes.tenant_id = "1" OR attributes.tenant_id="e8a63d46-35bf-5e1c-acec-5d2495b7ae59" OR attributes.tenant_id="2" OR attributes.tenant_id="3"

Environment details

ff-sdesai commented 10 months ago

Strange thing is it works intermittently. It worked 2 or 3 times and then it stopped working again with same tenant_id

ff-sdesai commented 9 months ago

Update- It's not working again. I tried lowering the google-cloud-pubsub SDK version to 2.18.1 but no result. Not sure how it worked few times on 2.18.4 and then stopped working again without any code change

ff-sdesai commented 9 months ago

@pradn I think I found out the reason behind this behavior. We are creating a subscription first with filter and a subscriber immediately after that using SDK. When there is no filter, the subscriber gets created successfully and starts receiving messages. However, when I specify a filter, it looks like the subscriber is not able to subscribe successfully although there is no error shown. It starts working fine after a redeployment/restart. Can you confirm if this is the case and is there any workaround to handle this?

pradn commented 9 months ago

However, when I specify a filter, it looks like the subscriber is not able to subscribe successfully although there is no error shown. It starts working fine after a redeployment/restart. Can you confirm if this is the case and is there any workaround to handle this?

Are you sure there are messages that satisfy the filter criteria? Subscription creation will work fine even if its filter matches no messages. When you pulled in Pantheon, are you sure it was with the subscription with the filter? Can you verify the messages have the attributes you expect?

There are no additional settings required on the subscriber client to pull from a subscription w/ filters.

ff-sdesai commented 9 months ago

Yes. If I do not add a sleep after subscription creation (at least 1 sec), I can see the messages in the subscription but the subscriber client(most probably) does not pull those. Also if I add sleep, it works flawlessly.

pradn commented 9 months ago

I forget something even more basic. A subscription only receives messages that were published after the subscription was created. So, a subscription with a filter won't receive messages published before its creation (we don't go back and filter through previously published messages).

Maybe what's happening when you add a sleep is that the messages published in the 1 second are being delivered?

ff-sdesai commented 9 months ago

I am aware about it. And as I have mentioned above, I can see the messages in Subscription if I click on 'pull' manually. Only the subscriber can never pull those messages if it is created right after the subscription having a filter. If I create a subscription without a filter, subscriber works without adding any delay

pradn commented 9 months ago

Can you please share code snippets for the two scenarios, with comments explaining what messages are pulled in each case?

mohammadtapad commented 4 months ago

Any updates on this? we're using pubsub operator in airflow, and it fails to fetch the messages.

mukund-ananthu commented 4 months ago

Hi @mohammadtapad , could you please file a customer ticket, so that this issue maybe triaged accordingly. Thanks!