astronomer / airflow-provider-fivetran-async

A new Airflow Provider for Fivetran, maintained by Astronomer and Fivetran
Apache License 2.0
20 stars 7 forks source link

FivetranOperator fails when a connector goes to `rescheduled` state #107

Open SouravBhowmikDE opened 2 months ago

SouravBhowmikDE commented 2 months ago

Hi We have a Google connector that goes to rescheduled state every now and then. And whenever the connector that goes to rescheduled state, the FivetranOperator Airflow task always fails. It seems that the FivetranOperator has a bug and it does not handle the rescheduled state correctly. Can someone please fix the issue asap.

Below is the logs from the failed FivetranOperator Airflow task:

[2024-09-04, 07:37:09 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled2024-09-04T10:37:00+00:00 [queued]> [2024-09-04, 07:37:09 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled2024-09-04T10:37:00+00:00 [queued]> [2024-09-04, 07:37:09 CDT] {{taskinstance.py:1361}} INFO - Starting attempt 1 of 1 [2024-09-04, 07:37:09 CDT] {{taskinstance.py:1382}} INFO - Executing <Task(FivetranOperator): fivetran_sync_google_ads_dcr> on 2024-09-04 10:37:00+00:00 [2024-09-04, 07:37:09 CDT] {{standard_task_runner.py:57}} INFO - Started process 21551 to run task [2024-09-04, 07:37:09 CDT] {{standard_task_runner.py:84}} INFO - Running: ['airflow', 'tasks', 'run', 'fivetran_sync_google_ads_dcr', 'fivetran_sync_google_ads_dcr', 'scheduled2024-09-04T10:37:00+00:00', '--job-id', '10888165', '--raw', '--subdir', 'DAGS_FOLDER/fivetran/fivetran_ingress.py', '--cfg-path', '/tmp/tmplk3d8cgu'] [2024-09-04, 07:37:09 CDT] {{standard_task_runner.py:85}} INFO - Job 10888165: Subtask fivetran_sync_google_ads_dcr [2024-09-04, 07:37:09 CDT] {{task_command.py:416}} INFO - Running <TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled2024-09-04T10:37:00+00:00 [running]> on host ip-172-22-27-20.ec2.internal [2024-09-04, 07:37:10 CDT] {{taskinstance.py:1662}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='fivetran_sync_google_ads_dcr' AIRFLOW_CTX_TASK_ID='fivetran_sync_google_ads_dcr' AIRFLOW_CTX_EXECUTION_DATE='2024-09-04T10:37:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled2024-09-04T10:37:00+00:00' [2024-09-04, 07:37:10 CDT] {{base.py:73}} INFO - Using connection ID 'data_fivetran' for task execution. [2024-09-04, 07:37:10 CDT] {{hooks.py:320}} INFO - Connector type: google_ads, connector schema: google_ads_dcr [2024-09-04, 07:37:10 CDT] {{hooks.py:321}} INFO - Connectors logs at https://fivetran.com/dashboard/connectors/google_ads/google_ads_dcr/logs [2024-09-04, 07:37:11 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = scheduled [2024-09-04, 07:37:11 CDT] {{taskinstance.py:1526}} INFO - Pausing task as DEFERRED. dag_id=fivetran_sync_google_ads_dcr, task_id=fivetran_sync_google_ads_dcr, execution_date=20240904T103700, start_date=20240904T123709 [2024-09-04, 07:37:12 CDT] {{local_task_job_runner.py:225}} INFO - Task exited with return code 100 (task deferral) [2024-09-04, 07:37:13 CDT] {{base.py:73}} INFO - Using connection ID 'data_fivetran' for task execution. [2024-09-04, 07:37:13 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = syncing [2024-09-04, 07:37:13 CDT] {{triggers.py:89}} INFO - sync is still running... [2024-09-04, 07:37:13 CDT] {{triggers.py:90}} INFO - sleeping for 30 seconds. [2024-09-04, 07:37:43 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = syncing [2024-09-04, 07:37:43 CDT] {{triggers.py:89}} INFO - sync is still running... [2024-09-04, 07:37:43 CDT] {{triggers.py:90}} INFO - sleeping for 30 seconds. [2024-09-04, 07:38:13 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = rescheduled [2024-09-04, 07:38:13 CDT] {{hooks.py:514}} INFO - Connector is in "rescheduled" state and needs to be manually restarted [2024-09-04, 07:38:13 CDT] {{hooks.py:556}} INFO - Starting connector again in 0 seconds [2024-09-04, 07:38:13 CDT] {{hooks.py:570}} INFO - Restarting connector now [2024-09-04, 07:38:13 CDT] {{triggerer_job_runner.py:599}} INFO - Trigger fivetran_sync_google_ads_dcr/scheduled__2024-09-04T10:37:00+00:00/fivetran_sync_google_ads_dcr/-1/1 (ID 4971) fired: TriggerEvent<{'status': 'error', 'message': "'BasicAuth' object is not callable"}> [2024-09-04, 07:38:22 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled2024-09-04T10:37:00+00:00 [queued]> [2024-09-04, 07:38:22 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled2024-09-04T10:37:00+00:00 [queued]> [2024-09-04, 07:38:22 CDT] {{taskinstance.py:1359}} INFO - Resuming after deferral [2024-09-04, 07:38:22 CDT] {{taskinstance.py:1382}} INFO - Executing <Task(FivetranOperator): fivetran_sync_google_ads_dcr> on 2024-09-04 10:37:00+00:00 [2024-09-04, 07:38:22 CDT] {{standard_task_runner.py:57}} INFO - Started process 22523 to run task [2024-09-04, 07:38:22 CDT] {{standard_task_runner.py:84}} INFO - Running: ['airflow', 'tasks', 'run', 'fivetran_sync_google_ads_dcr', 'fivetran_sync_google_ads_dcr', 'scheduled2024-09-04T10:37:00+00:00', '--job-id', '10888173', '--raw', '--subdir', 'DAGS_FOLDER/fivetran/fivetran_ingress.py', '--cfg-path', '/tmp/tmpkx044i49'] [2024-09-04, 07:38:22 CDT] {{standard_task_runner.py:85}} INFO - Job 10888173: Subtask fivetran_sync_google_ads_dcr [2024-09-04, 07:38:22 CDT] {{task_command.py:416}} INFO - Running <TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled__2024-09-04T10:37:00+00:00 [running]> on host ip-172-22-24-53.ec2.internal [2024-09-04, 07:38:22 CDT] {{taskinstance.py:1937}} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1606, in resume_execution return execute_callable(context) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/fivetran_provider_async/operators.py", line 170, in execute_complete raise AirflowException(msg) airflow.exceptions.AirflowException: error: 'BasicAuth' object is not callable [2024-09-04, 07:38:22 CDT] {{taskinstance.py:1400}} INFO - Marking task as FAILED. dag_id=fivetran_sync_google_ads_dcr, task_id=fivetran_sync_google_ads_dcr, execution_date=20240904T103700, start_date=20240904T123709, end_date=20240904T123822 [2024-09-04, 07:38:22 CDT] {{base.py:73}} INFO - Using connection ID 'slack_conn_id' for task execution. [2024-09-04, 07:38:22 CDT] {{base.py:73}} INFO - Using connection ID 'slack_conn_id' for task execution. [2024-09-04, 07:38:23 CDT] {{standard_task_runner.py:104}} ERROR - Failed to execute job 10888173 for task fivetran_sync_google_ads_dcr (error: 'BasicAuth' object is not callable; 22523) [2024-09-04, 07:38:23 CDT] {{local_task_job_runner.py:228}} INFO - Task exited with return code 1

pankajastro commented 2 months ago

Hi @SouravBhowmikDE, would you be interested in submitting a fix? I’d be happy to review and merge it.

SouravBhowmikDE commented 2 months ago

Hi @pankajastro Sure, I will give this a try in a few days. I have never done this before.