apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.21k stars 14.32k forks source link

Task sync fails when using celery executor with AWS SQS broker. #26520

Closed r4um closed 2 years ago

r4um commented 2 years ago

Apache Airflow version

Other Airflow 2 version

What happened

The change in https://github.com/apache/airflow/pull/3830 introduced multiprocessing while syncing task status, when using celery executor with AWS SQS broker, the dag runs fail with the following backtrace in the scheduler logs.

[2022-02-01 12:52:52,595] {celery_executor.py:299} ERROR - Error sending Celery task: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed:
b''
Celery Task ID: TaskInstanceKey(dag_id='shared-dev_jobflow-airflow_k8s_job_multiple_tasks', task_id='t4', run_id='manual__2022-02-01T12:52:40.329850+00:00', try_number=1)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 480, in _parse_xml_string_to_dom
    root = parser.close()
  File "<string>", line None
xml.etree.ElementTree.ParseError: no element found: line 1, column 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 172, in send_task_to_executor
    result = task_to_run.apply_async(args=[command], queue=queue)
  File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 579, in apply_async
    **options
  File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 788, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 519, in send_task_message
    **properties
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 180, in publish
    exchange_name, declare, timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 524, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 193, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 193, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 99, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 110, in maybe_declare
    return _maybe_declare(entity, channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 150, in _maybe_declare
    entity.declare(channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 606, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 615, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 650, in queue_declare
    nowait=nowait,
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 528, in queue_declare
    return queue_declare_ok_t(queue, self._size(queue), 0)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 607, in _size
    AttributeNames=['ApproximateNumberOfMessages'])
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 695, in _make_api_call
    operation_model, request_dict, request_context)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 714, in _make_request
    return self._endpoint.make_request(operation_model, request_dict)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 102, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 135, in _send_request
    request, operation_model, context)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 166, in _get_response
    request, operation_model)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 217, in _do_get_response
    response_dict, operation_model.output_shape)
  File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 245, in parse
    parsed = self._do_parse(response, shape)
  File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 551, in _do_parse
    return self._parse_body_as_xml(response, shape, inject_metadata=True)
  File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 555, in _parse_body_as_xml
    root = self._parse_xml_string_to_dom(xml_contents)
  File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 485, in _parse_xml_string_to_dom
    (e, xml_string))
botocore.parsers.ResponseParserError: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed: 
b''

[2022-02-01 12:52:52,821] {scheduler_job.py:572} ERROR - Executor reports task instance <TaskInstance: shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-02-01 12:52:52,825] {taskinstance.py:1705} ERROR - Executor reports task instance <TaskInstance: shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-02-01 12:52:52,834] {taskinstance.py:1280} INFO - Marking task as FAILED. dag_id=shared-dev_jobflow-airflow_k8s_job_multiple_tasks, task_id=t4, execution_date=20220201T125240, start_date=, end_date=20220201T125252

This is likely due to boto3 session and resource objects not being thread safe[1]. Setting sync_parallelism to 1, the problem does not occur.

[1] https://boto3.amazonaws.com/v1/documentation/api/1.14.31/guide/session.html#multithreading-or-multiprocessing-with-sessions

What you think should happen instead

No response

How to reproduce

Use celery executor and AWS SQS as the celery broker. Launch a dag with multiple tasks and dependencies among them. e.g 5 tasks with dependencies as

t3 << [t1, t2]
t4 << [t3]
t5 << [t3]

Task type doesn't matter. In our case, it is the KubernetesPodOperator.

Operating System

CentOS Linux 7 (Core)

Versions of Apache Airflow Providers

airflow version 2.2.2 Python version 3.7.9 Using airflow constraints for 2.2.2 while installing packages.

Deployment

Other

Deployment details

Deployed under AWS EKS via containers.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!