apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.38k stars 14.35k forks source link

BeamRunPythonPipelineOperator doesn't work with Google Application Default Credentials ADC #42396

Open fpopic opened 2 months ago

fpopic commented 2 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3 (problem occurs in latest version as well, will try to download latest and post log as well)

What happened?

When manually submitting an Apache Beam Python job to Google Dataflow runner using BeamRunPythonPipelineOperator

Task gets stuck in apitools that use oauth2client

and try initiative browser Google sign-in which must fail. I don't understand why the authentication flow ends in that execution branch since the credential of type authorized_userexist in the well-known path ~/.config/gcloud/application_default_credentials.json.

[2024-09-21, 21:05:58 UTC] {taskinstance.py:1328} INFO - Executing <Task(BeamRunPythonPipelineOperator): submit_beam_job> on 2022-01-01 00:00:00+00:00
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:57} INFO - Started process 2105 to run task
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'dag-example-beam-dataflow-python', 'submit_beam_job', 'scheduled__2022-01-01T00:00:00+00:00', '--job-id', '716', '--raw', '--subdir', 'DAGS_FOLDER/dag_example_beam_dataflow_python/dag_example_beam_dataflow_python.py', '--cfg-path', '/tmp/tmprcjz83eb']
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:85} INFO - Job 716: Subtask submit_beam_job
[2024-09-21, 21:05:58 UTC] {task_command.py:414} INFO - Running <TaskInstance: dag-example-beam-dataflow-python.submit_beam_job scheduled__2022-01-01T00:00:00+00:00 [running]> on host 9743bdb39e14
[2024-09-21, 21:05:58 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dag-example-beam-dataflow-python' AIRFLOW_CTX_TASK_ID='submit_beam_job' AIRFLOW_CTX_EXECUTION_DATE='2022-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2022-01-01T00:00:00+00:00'
[2024-09-21, 21:05:58 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.

[2024-09-21, 21:05:58 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-09-21, 21:05:58 UTC] {beam.py:198} INFO - {'job_name': 'simple-beam-job-7d346a20', 'project': 'XXX', 'region': 'europe-west1', 'labels': {'airflow-version': 'v2-6-3-composer'}}
[2024-09-21, 21:05:59 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-09-21, 21:05:59 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-09-21, 21:05:59 UTC] {logging_mixin.py:150} WARNING - /opt/python3.8/lib/python3.8/site-packages/google/auth/_default.py:78 UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. See the following page for troubleshooting: https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds.

[2024-09-21, 21:06:05 UTC] {beam.py:271} INFO - Beam version: 2.50.0
[2024-09-21, 21:06:05 UTC] {beam.py:131} INFO - Running command: python3 /home/airflow/gcs/dags/dag_example_beam_dataflow_python/src/beam/job_example_beam_dataflow_python.py --runner=DataflowRunner --job_name=simple-beam-job-7d346a20 --project=XXX --region=europe-west1 --labels=airflow-version=v2-6-3-composer --worker_machine_type=n1-standard-1 --disk_size_gb=10 --num_workers=1
[2024-09-21, 21:06:05 UTC] {beam.py:142} INFO - Start waiting for Apache Beam process to complete.
[2024-09-21, 21:06:07 UTC] {beam.py:113} INFO - 0

[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - Generating new OAuth credentials ...
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - Your browser has been opened to visit:
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -     https://accounts.google.com/o/oauth2/v2/auth?client_id=XXXX.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8090%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute.readonly+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email&access_type=offline&response_type=code
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - If your browser is on a different machine then exit and re-run this
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - application with the command-line parameter
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -   --noauth_local_webserver
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -

What you think should happen instead?

Airflow should submit job to Dataflow using Application Default Credentials the same way standalone Apache Beam Python (without Airflow) submits the job to Dataflow does.

I see that Apache Beam already solved that problem https://github.com/apache/beam/pull/15004, hence running Apache Beam Python without Airflow using ADC works.

How to reproduce

Prepare env. variables:

unset GOOGLE_APPLICATION_CREDENTIALS
unset GCP_PROJECT
gcloud auth application-default login
gcloud config set project <project>

and execute the following DAG

# -*- coding: utf-8 -*-
import os
from datetime import datetime
from airflow.models import DAG

from airflow.providers.apache.beam.operators.beam import (
    BeamRunPythonPipelineOperator,
)
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowConfiguration,
)

current_path = os.path.dirname(__file__)

with DAG(
    dag_id="dag-example-beam-dataflow-python-adc",
    default_args={"owner": "airflow"},
    start_date=datetime(2024, 1, 1),
    schedule_interval="@once",
    catchup=True,
) as dag:
    start_dag = EmptyOperator(task_id="start_dag")
    end_dag = EmptyOperator(task_id="end_dag")

    submit_beam_job = BeamRunPythonPipelineOperator(
        task_id="submit_beam_job_with_dataflow_using_adc",
        py_file=os.path.join("job_example_beam_dataflow_python_adc.py"),
        runner="DataflowRunner",
        pipeline_options={
            "temp_location": "<bucket>",
            "staging_location":  "<bucket>",
        },
        dataflow_config=DataflowConfiguration(
            job_name="submit_beam_job_with_dataflow_using_adc",
            project_id="<project_id>",
            location="<location>",
            wait_until_finished=True,
        ),
        do_xcom_push=True,
    )

    start_dag >> submit_beam_job >> end_dag

and Apache Beam job source code job_example_beam_dataflow_python_adc.py

# -*- coding: utf-8 -*-
import argparse
import logging
import apache_beam as beam
from apache_beam import Create
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions

class PrintElementDoFn(beam.DoFn):
    def process(self, element, *args, **kwargs):
        print(f"Processing element {element}.")

def run(argv=None):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = parser.parse_known_args(argv)
    print(known_args.sleep)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    p | "create dummy events" >> Create([1]) | "print dummy elements" >> beam.ParDo(
        PrintElementDoFn()
    )

    p.run()

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

Operating System

macOS 14.7

Versions of Apache Airflow Providers

apache-airflow-providers-apache-beam==5.3.0

Deployment

Docker Airflow / Composer Airflow (doesn't matter, problem occurs in latest version as well).

Deployment details

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

Lee-W commented 1 month ago

Hi @fpopic , as you checked "Yes I am willing to submit a PR!", I'll assign this to you. But please let us know if you no longer interested in it. Thanks!

fpopic commented 1 month ago

I removed the checkmark, would appreciate help.

fpopic commented 1 month ago

Adding minimal change to latest setup to reproduce

In docker-compose.yml passing host ADC config with write permission (to refresh host token from within container)

x-airflow-common:
  &airflow-common
  #...
  build: .
  #image: ...
  environment:
    &airflow-common-env
    # ...
    AIRFLOW__LOGGING__LOGGING_LEVEL: 'WARNING'
    AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: '{ "conn_type": "google_cloud_platform", "extra": { "scope": "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/drive,https://www.googleapis.com/auth/bigquery",  "num_retries": 0 } }'
    GOOGLE_CLOUD_PROJECT: <my-project>

  volumes: &airflow-common-volumes
    # ...
    # Use ADC
    - ~/.config/gcloud/:/home/airflow/.config/gcloud:rw
...

In Dockerfile install google cloud SDK (gcloud command) since google base hook requires gcloud CLI it: https://github.com/apache/airflow/blob/2.10.2/airflow/providers/google/common/hooks/base_google.py#L638-L669

# specifying explicit python 3.9 otherwise gcloud sdk installation complains
FROM apache/airflow:2.10.2-python3.9
USER root

## COPY FROM https://github.com/apache/airflow/blob/main/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
ARG CLOUD_SDK_VERSION=322.0.0
ENV GCLOUD_HOME=/home/airflow/google-cloud-sdk
ENV PATH="${GCLOUD_HOME}/bin/:${PATH}"

USER airflow

RUN DOWNLOAD_URL="https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz" \
    && TMP_DIR="$(mktemp -d)" \
    && curl -fL "${DOWNLOAD_URL}" --output "${TMP_DIR}/google-cloud-sdk.tar.gz" \
    && mkdir -p "${GCLOUD_HOME}" \
    && tar xzf "${TMP_DIR}/google-cloud-sdk.tar.gz" -C "${GCLOUD_HOME}" --strip-components=1 \
    && "${GCLOUD_HOME}/install.sh" \
       --bash-completion=false \
       --path-update=false \
       --usage-reporting=false \
       --additional-components alpha beta kubectl \
       --quiet \
    && rm -rf "${TMP_DIR}" \
    && rm -rf "${GCLOUD_HOME}/.install/.backup/" \
    && gcloud --version
# END OF COPY

# pip install airflow has to be run with USER airflow
RUN pip install 'apache-airflow==2.10.2' \
    apache-airflow[google_auth] \
    apache-airflow-providers-google \
    apache-airflow[google] \
    apache-airflow-providers-apache-beam
    #apache-airflow-providers-hashicorp \

RUN mkdir -p /home/airflow/.config/gcloud/
eladkal commented 3 weeks ago

cc @VladaZakharova