aelzeiny / airflow-aws-executors

Airflow Executor for both AWS ECS & AWS Fargate
MIT License
53 stars 9 forks source link

AwsBatchExecutor is unable to adopt orphaned task instances #14

Open mattellis opened 3 years ago

mattellis commented 3 years ago

We've been successfully using the Airflow Batch Executor in a production AWS environment for a number of weeks now, where previously we ran a Celery Executor on an EC2 ASG. Overall it's working very well, but we've had to patch the executor to solve a couple of minor issues for our airflow implementation.

This may be intentional behaviour and implementation, but we recently discovered that the Batch Executor (and also the Fargate executor) does not have the ability to adopt orphaned tasks, and that it terminates all running Batch jobs when it receives the terminate() callback. We have a regular release cycle (every day or two), and the scheduler instances get replaced. As per normal airflow behaviour, the newly booted scheduler/executor will try and "adopt orphaned task instances" that were started by the terminated SchedulerJob/s.

However, with the AwsBatchExecutor we are now getting task failures every time the scheduler is deployed, which show up as a SIGTERM sent to each running LocalTaskJob on the Batch job containers. An example of a Task log with this issue:

[2021-04-29 10:02:05,102] {{logging_mixin.py:104}} INFO - Running <TaskInstance: ******** 2021-04-28T00:00:00+00:00 [running]> on host ********
[2021-04-29 10:02:05,606] {{taskinstance.py:1255}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=********
AIRFLOW_CTX_DAG_ID=********
AIRFLOW_CTX_TASK_ID=********
AIRFLOW_CTX_EXECUTION_DATE=2021-04-28T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-04-28T00:00:00+00:00
...
[2021-04-29 10:04:28,398] {{local_task_job.py:187}} WARNING - State of this instance has been externally set to None. Terminating instance.
[2021-04-29 10:04:28,399] {{process_utils.py:100}} INFO - Sending Signals.SIGTERM to GPID 117
[2021-04-29 10:04:28,400] {{taskinstance.py:1239}} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-04-29 10:04:28,580] {{taskinstance.py:1455}} ERROR - Task received SIGTERM signal
...
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/operators/batch.py", line 148, in execute
    self.monitor_job(context)
  File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/operators/batch.py", line 205, in monitor_job
    raise AirflowException(e)
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-04-29 10:04:28,582] {{taskinstance.py:1496}} INFO - Marking task as FAILED.
[2021-04-29 10:04:28,652] {{process_utils.py:66}} INFO - Process psutil.Process(pid=117, status='terminated', exitcode=1, started='10:02:04') (117) terminated with exit code 1

Presumably the terminate method does this to clean up running batch jobs in case the scheduler / executor is shutting down and not resuming. However, for users with rolling scheduler deployments, this is causing unnecessary failures. In order to support adoption of orphaned tasks, the BatchExecutor just needs to store the AWS Batch job_id in the TaskInstance.external_executor_id field when it submits a job, and then implement the BaseExecutor.try_adopt_task_instances method. This method simply needs to put the orphaned task instance key and external_executor_id attributes in the active_workers.add_job method of the newly booted executor.

A test implementation yields the desired result, from the scheduler instance / batch executor logs:

Screen Shot 2021-05-06 at 1 26 48 pm

Given that some users may have a different deployment architecture, I propose that this implementation could be toggled by a configuration option such as [batch].adopt_orphaned_task_instances (bool). I'd be happy with defaulting this to True or False, depending on the main intention of this Executor implementation.

PS. We were actually considering writing a Batch Executor when we came across this implementation with perfect timing, so thanks for open sourcing this great Executor solution. Hopefully it can be integrated with Airflow mainline at some point, as I believe it has real legs for anyone running a custom Airflow stack on AWS.