apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.1k stars 14.3k forks source link

‎PubsubPullTrigger does not pass gcp_conn_id to underlying hook #42160

Closed nickmarx12345678 closed 1 month ago

nickmarx12345678 commented 2 months ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow==2.5.3 apache-airflow-providers-cncf-kubernetes==7.10.0 apache-airflow-providers-common-sql==1.8.1 apache-airflow-providers-ftp==3.6.1 apache-airflow-providers-google==10.12.0 apache-airflow-providers-http==4.7.0 apache-airflow-providers-imap==3.4.0 apache-airflow-providers-pagerduty==2.1.2 apache-airflow-providers-slack==8.4.0 apache-airflow-providers-sqlite==3.5.0 google-cloud-pubsub==2.18.4

Apache Airflow version

v2.5.3+composer

Operating System

Google Cloud Composer (unsure)

Deployment

Google Cloud Composer

Deployment details

No response

What happened

When attempting to use the Sensor PubSubPullSensor in specifically deferrable mode with a non-default gcp connection, we encounter a lack of permissions despite relevant service account having proper permissions (verified via gcloud command line).

airflow-triggerer , poke_interval=10.0, gcp_conn_id=cre-raw-data-ingest-prod-service-account, impersonation_chain=None> (ID 7752) fired: TriggerEvent<{'status': 'error', 'message': "('Error pulling messages from subscription projects/xp-raw-data-ingest-staging/subscriptions/ili-subscription', PermissionDenied('User not authorized to perform this action.'))"}> 

With the same arguments and connection, in non-deferrable mode, we observe successful sensor operation (ability to wait, pull, and ack messages).

It appears the underlying trigger does not pass along connection id parameters to the hook

In the non-deferrable pathway, we can see the conn_id and impersonation chain passed to the hook

In our triggerer logs, we see related evidence of use of the default gcp conn id.

2024-09-11 09:59:49.344 PDT
airflow-triggerer Using connection ID 'google_cloud_default' for task execution. 
2024-09-11 09:59:49.346 PDT
airflow-triggerer Getting connection using `google.auth.default()` since no explicit credentials are provided. 
2024-09-11 09:59:49.361 PDT
airflow-triggerer Pulling max 1 messages from subscription (path) projects/xp-raw-data-ingest-staging/subscriptions/ili-subscription 
2024-09-11 09:59:49.433 PDT
airflow-triggerer Trigger <airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger project_id=xp-raw-data-ingest-staging, subscription=ili-subscription, max_messages=1, ack_messages=True, messages_callback=def _default_message_callback( 
2024-09-11 09:59:49.434 PDT
airflow-triggerer     pulled_messages: List[ReceivedMessage], 

How to reproduce

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id="the_project",
        gcp_conn_id="the_conn_id",
        subscription="the-subscription",
        deferrable=True, # or False
        max_messages=1,
    )

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

gopidesupavan commented 2 months ago

Yes , there is another issue working on, will check and update if this can be added along with that.