apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.92k stars 14.26k forks source link

Using the AwsGlueCrawlerOperator generates an error the first time a crawler is created #19109

Closed 094459 closed 3 years ago

094459 commented 3 years ago

Apache Airflow version

2.0.2

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

[2021-10-20 14:40:23,892] {{bash.py:173}} INFO - apache-airflow-providers-amazon==1.4.0 [2021-10-20 14:40:23,944] {{bash.py:173}} INFO - apache-airflow-providers-celery==1.0.1 [2021-10-20 14:40:23,961] {{bash.py:173}} INFO - apache-airflow-providers-databricks==1.0.1 [2021-10-20 14:40:23,977] {{bash.py:173}} INFO - apache-airflow-providers-docker==1.2.0 [2021-10-20 14:40:23,999] {{bash.py:173}} INFO - apache-airflow-providers-ftp==1.0.1 [2021-10-20 14:40:24,015] {{bash.py:173}} INFO - apache-airflow-providers-http==1.1.1 [2021-10-20 14:40:24,030] {{bash.py:173}} INFO - apache-airflow-providers-imap==1.0.1 [2021-10-20 14:40:24,050] {{bash.py:173}} INFO - apache-airflow-providers-oracle==1.1.0 [2021-10-20 14:40:24,067] {{bash.py:173}} INFO - apache-airflow-providers-postgres==1.0.2 [2021-10-20 14:40:24,083] {{bash.py:173}} INFO - apache-airflow-providers-presto==1.0.2 [2021-10-20 14:40:24,098] {{bash.py:173}} INFO - apache-airflow-providers-sftp==1.2.0 [2021-10-20 14:40:24,458] {{bash.py:173}} INFO - apache-airflow-providers-slack==3.0.0 [2021-10-20 14:40:24,476] {{bash.py:173}} INFO - apache-airflow-providers-sqlite==1.0.2 [2021-10-20 14:40:24,492] {{bash.py:173}} INFO - apache-airflow-providers-ssh==1.3.0 [2021-10-20 14:40:24,513] {{bash.py:173}} INFO - apache-airflow-providers-tableau==1.0.0

Deployment

MWAA

Deployment details

I have provisioned an Apache Airflow 2.0.2 environment in MWAA, and created a DAG to kick off an AWS Glue Crawler job. I had to configure IAM permissions to enable the MWAA workers to kick off the jobs as well as have access to the S3 buckets that I want crawled, but other than this, it is a pretty vanilla configuration.

This is my DAG

import os
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue_crawler import AwsGlueCrawlerOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    #"start_date": datetime(2021, 9, 18),
    "start_date" : days_ago(1),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5)
    # 'end_date': datetime(2016, 1, 1),
}

with DAG(
        dag_id=os.path.basename(__file__).replace(".py", ""),
        default_args=default_args,
        dagrun_timeout=timedelta(hours=2),
        schedule_interval=None

) as dag:

    run_crawler = AwsGlueCrawlerOperator(task_id='run_crawler',
        aws_conn_id='aws_default',
        config={
            'Name':'airflow-timestream-crawler',
            'Role':'service-role/AWSGlueServiceRole-reinvent-glue-crawler',
            'DatabaseName' : 'reinvent-airflow-demo-crawler',
            'Description': 'Crawler for airflow_timeseriesdb',
            'Targets':{'S3Targets' : [{'Path': 's3://demo-airflow-ts-output', 'Exclusions': [ 'demo-airflow-flink/**', 'files/**'] }]}}
            )

    run_crawler

What happened

When i trigger the DAG, I get an error as follows:

[2021-10-19 18:31:48,509] {{logging_mixin.py:104}} INFO - [2021-10-19 18:31:48,509] {{glue_crawler.py:107}} INFO - Creating crawler: airflow-timestream-crawler
[2021-10-19 18:31:48,666] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/glue_crawler.py", line 73, in execute
    self.hook.create_crawler(**self.config)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 108, in create_crawler
    return self.glue_client.create_crawler(**crawler_kwargs)['Crawler']['Name']
KeyError: 'Crawler'

When I go to the AWS Glue console, I can see that the Crawler has been created.

When I trigger the DAG a second time, it works and the Crawler runs.

What you expected to happen

I would expect to not get this error the first time it is run.

How to reproduce

You can provision a MWAA 2.0.2 environment, create an S3 bucket with some csv files, update permissions so that it can kick off Glue jobs and then deploy this DAG.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

eladkal commented 3 years ago

Please update to the latest Amazon provider. The code line that your traceback is referring to:

  File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue_crawler.py", line 108, in create_crawler
    return self.glue_client.create_crawler(**crawler_kwargs)['Crawler']['Name']

Was removed in PR https://github.com/apache/airflow/pull/16012

094459 commented 3 years ago

Thank you. If I understand correctly, the next stable version of apache-airflow-providers-amazon is 2.0 and that (according to the doc on PyPi) is for Airflow 2.1 or newer. I will give this a try and see if it works, but wondering whether this might cause other issues/problems.