apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

google provider: GCP credentials are not passed into transfer job polling when executing S3ToGCSOperator #37331

Open aviramst opened 7 months ago

aviramst commented 7 months ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.9.0

Apache Airflow version

2.7.2

Operating System

Linux

Deployment

Amazon (AWS) MWAA

Deployment details

No response

What happened

When executing S3ToGCSOperator, it creates the data transfer job successfully but fails to get the job status because it looks for GCP default credentials rather than using the provided gcp_conn_id.

[2024-02-11, 12:48:21 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:21 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:22 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:22 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:23 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:23 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:24 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:24 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:25 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:25 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:26 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:26 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 1000 files
[2024-02-11, 12:48:27 UTC] {{cloud_storage_transfer_service.py:217}} INFO - Created job transferJobs/XXX
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:324}} INFO - Submitted job transferJobs/XXX to transfer 530 files
[2024-02-11, 12:48:27 UTC] {{s3_to_gcs.py:330}} INFO - Overall submitted 10 jobs to transfer 9530 files
[2024-02-11, 12:48:27 UTC] {{taskinstance.py:1526}} INFO - Pausing task as DEFERRED. dag_id=transfer_files, task_id=s3_to_gcs_example, execution_date=20240211T124244, start_date=20240211T124816
[2024-02-11, 12:48:27 UTC] {{local_task_job_runner.py:225}} INFO - Task exited with return code 100 (task deferral)
[2024-02-11, 12:48:28 UTC] {{cloud_storage_transfer_service.py:64}} INFO - Attempting to request jobs statuses
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: [Errno 22] Invalid argument
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: [Errno 22] Invalid argument
[2024-02-11, 12:48:28 UTC] {{_metadata.py:139}} WARNING - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 22] Invalid argument
[2024-02-11, 12:48:28 UTC] {{_default.py:338}} WARNING - Authentication failed using Compute Engine authentication due to unavailable metadata server.
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: transfer_files.s3_to_gcs_example manual__2024-02-11T12:42:44+00:00 [queued]>
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1359}} INFO - Resuming after deferral
[2024-02-11, 12:48:31 UTC] {{taskinstance.py:1382}} INFO - Executing <Task(S3ToGCSOperator): s3_to_gcs_example> on 2024-02-11 12:42:44+00:00
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:57}} INFO - Started process 24431 to run task
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:84}} INFO - Running: ['airflow', 'tasks', 'run', 'transfer_files', 's3_to_gcs_example', 'manual__2024-02-11T12:42:44+00:00', '--job-id', '170059', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpr6aa0mal']
[2024-02-11, 12:48:31 UTC] {{standard_task_runner.py:85}} INFO - Job 170059: Subtask s3_to_gcs_example

[2024-02-11, 12:48:32 UTC] {{baseoperator.py:1600}} ERROR - Trigger failed:
Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 526, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 598, in run_trigger
    async for event in trigger.run():

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py", line 67, in run
    jobs_pager = await async_hook.get_jobs(job_names=self.job_names)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 520, in get_jobs
    client = self.get_conn()
             ^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 510, in get_conn
    self._client = StorageTransferServiceAsyncClient()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/async_client.py", line 225, in __init__
    self._client = StorageTransferServiceClient(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/client.py", line 441, in __init__
    self._transport = Transport(
                      ^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/grpc_asyncio.py", line 198, in __init__
    super().__init__(

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/cloud/storage_transfer_v1/services/storage_transfer_service/transports/base.py", line 99, in __init__
    credentials, _ = google.auth.default(
                     ^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/google/auth/_default.py", line 691, in default
    raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)

google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.

What you think should happen instead

I think that S3ToGCSOperator should create the storage API object with the GCP credentials provided (gcp_conn_id)

How to reproduce

from airflow import DAG
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from datetime import datetime
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'myself',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

my_dag = DAG(
    's3_to_gcs_transfer',
    default_args=default_args,
    description='Transfer files from S3 to GCS',
    schedule_interval=None, 
    catchup=False,
)

s3_to_gcs_op = S3ToGCSOperator(
    task_id="s3_to_gcs_example",
    bucket="my-s3-bucket",
    prefix="my-prefix",
    apply_gcs_prefix=True,
    gcp_conn_id="my-gcp-conn-id",
    aws_conn_id="my-aws-conn-id",
    dest_gcs="gs://my-gcs-bucket/",
    replace=False,
    deferrable=True,
    dag=my_dag,
)

s3_to_gcs_op

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 7 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Taragolis commented 7 months ago

Looks looks like it affect in case of deferrable mode, CloudStorageTransferServiceCreateJobsTrigger do not have such an option to provide connection id

https://github.com/apache/airflow/blob/90e2b12d6b99d2f7db43e45f5e8b97d3b8a43b36/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py#L33-L40

korolkevich commented 7 months ago

Hello! Now I am investigating this issue and then I will try to prepare a fix for this.

o-nikolas commented 7 months ago

Assigned to you @korolkevich :)

eladkal commented 6 months ago

The remaining task on this issue is explained in: https://github.com/apache/airflow/pull/37518#discussion_r1541078941