This is an AWS Executor that delegates every task to a scheduled container on either AWS Batch, AWS Fargate, or AWS ECS.
pip install airflow-aws-executors
For AWS Batch
: Getting Started with AWS Batch ReadMe
For AWS ECS/Fargate
: Getting Started with AWS ECS/Fargate ReadMe
There's so much to unpack here.
AWS Batch can be seen as very similar to the Celery Executor, but WITH Autoscaling. AWS will magically provision and take-down instances. AWS will magically monitor each container store their status for ~24 hours. AWS will determine when to autoscale based off of amount of time and number of tasks in queue.
In contrast, Celery can scale up, but doesn't have a good scaling-down story (based off of personal experience). If you look at Celery's Docs about Autoscaling you'll find APIs about scaling the number of threads on one server; that doesn't even work. Each Celery workers is the user's responsibility to provision and maintain at fixed capacity. The Celery Backend and worker queue also need attention and maintenance. I've tried autoscaling an ECS cluster on my own with CloudWatch Alarms on SQS, triggering CloudWatch Events, triggering capacity providers, triggering Application Autoscaling groups, and it was a mess that I never got to work properly.
If you're on the Fargate executor it may take ~2.5 minutes for a task to pop up, but at least it's a constant O(1) time. This way, the concept of tracking DAG Landing Times becomes unnecessary. If you have more than 2000 concurrent tasks (which is a lot) then you can always contact AWS to provide an increase in this soft-limit.
I almost always recommend that you go the AWS Batch route. Especially since, as of Dec 2020, AWS Batch supports Fargate deployments. So unless you need some very custom flexibility provided by ECS, or have a particular reason to use AWS Fargate directly, then go with AWS Batch.
AWS Batch
- Is built on top of ECS, but has additional features for Batch-Job management. Including auto-scaling up and down servers on an ECS cluster based on jobs submitted to a queue. Generally easier to configure and setup than either option.
AWS Fargate
- Is a serverless container orchestration service; comparable to a proprietary AWS version of Kubernetes. Launching a Fargate Task is like saying "I want these containers to be launched somewhere in the cloud with X CPU and Y memory, and I don't care about the server". AWS Fargate is built on top of AWS ECS, and is easier to manage and maintain. However, it provides less flexibility.
AWS ECS
- Is known as "Elastic Container Service", which is a container orchestration service that uses a designated cluster of EC2 instances that you operate, own, and maintain.
Batch | Fargate | ECS | |
---|---|---|---|
Start-up per task | Combines both, depending on if the job queue is Fargate serverless | 2-3 minutes per task; O(1) constant time | Instant 3s, or until capacity is available. |
Maintenance | You patch the own, operate, and patch the servers OR Serverless (as of Dec 2020) | Serverless | You patch the own, operate, and patch the servers |
Capacity | Autoscales to configurable Max vCPUs in compute environment | ~2000 containers. See AWS Limits | Fixed. Not auto-scaling. |
Flexibility | Combines both, depending on if the job queue is Fargate serverless | Low. Can only do what AWS allows in Fargate | High. Almost anything that you can do on an EC2 |
Fractional CPUs? | Yes, as of Dec 2020 a task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. | Yes. A task can have 0.25 vCPUs. |
This means that you can specify CPU, Memory, env vars, and GPU requirements on a task.
Specifying an executor config will be merged directly into the Batch.submit_job() request kwarg.
For example:
task = PythonOperator(
python_callable=lambda *args, **kwargs: print('hello world'),
task_id='say_hello',
executor_config=dict(
vcpus=1,
memory=512
),
dag=dag
)
Specifying an executor config will be merged into the ECS.run_task() request kwargs as a container override for the airflow container. Refer to AWS' documentation for Container Override for a full list of kwargs
For example:
task = PythonOperator(
python_callable=lambda *args, **kwargs: print('hello world'),
task_id='say_hello',
executor_config=dict(
cpu=256, # 0.25 fractional CPUs
memory=512
),
dag=dag
)
[batch]
region
job_name
job_queue
job_definition
submit_job_kwargs
Extensibility
section below.[ecs_fargate]
region
cluster
container_name
task_definition
launch_type
platform_version
assign_public_ip
security_groups
subnets
run_task_kwargs
Extensibility
section below.NOTE: Modify airflow.cfg or export environmental variables. For example:
AIRFLOW__ECS_FARGATE__REGION="us-west-2"
There are many different ways to schedule an ECS, Fargate, or Batch Container. You may want specific container overrides, environmental variables, subnets, retries, etc. This project does not attempt to wrap around the AWS API. These technologies are ever evolving, and it would be impossible to keep up with AWS's innovations. Instead, it allows the user to offer their own configuration in the form of Python dictionaries, which are then directly passed to Boto3's run_task or submit_job function as **kwargs. This allows for maximum flexibility and little maintenance.
In this example we will modify the default submit_job_kwargs
config. Note, however, there is
nothing that's stopping us from completely overriding it and providing our own config.
If we do so, be sure to specify the mandatory Airflow configurations in the section above.
For example:
# exporting env vars in this way is like modifying airflow.cfg
export AIRFLOW__BATCH__SUBMIT_JOB_KWARGS="custom_module.CUSTOM_SUBMIT_JOB_KWARGS"
# filename: AIRFLOW_HOME/plugins/custom_module.py
from airflow_aws_executors.conf import BATCH_SUBMIT_JOB_KWARGS
from copy import deepcopy
# Add retries & timeout to default config
CUSTOM_SUBMIT_JOB_KWARGS = deepcopy(BATCH_SUBMIT_JOB_KWARGS)
CUSTOM_SUBMIT_JOB_KWARGS['retryStrategy'] = {'attempts': 3}
CUSTOM_SUBMIT_JOB_KWARGS['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"
class CustomBatchExecutor(AwsBatchExecutor):
def _submit_job_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
submit_job_api = super()._submit_job_kwargs(task_id, cmd, queue, exec_config)
if queue == 'long_tasks_queue':
submit_job_api['retryStrategy'] = {'attempts': 3}
submit_job_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
return submit_job_api
In this example we will modify the default submit_job_kwargs
. Note, however, there is nothing that's stopping us
from completely overriding it and providing our own config. If we do so, be sure to specify the mandatory Airflow configurations
in the section above.
For example:
# exporting env vars in this way is like modifying airflow.cfg
export AIRFLOW__BATCH__SUBMIT_JOB_KWARGS="custom_module.CUSTOM_SUBMIT_JOB_KWARGS"
# filename: AIRFLOW_HOME/plugins/custom_module.py
from airflow_aws_executors.conf import ECS_FARGATE_RUN_TASK_KWARGS
from copy import deepcopy
# Add environmental variables to contianer overrides
CUSTOM_RUN_TASK_KWARGS = deepcopy(ECS_FARGATE_RUN_TASK_KWARGS)
CUSTOM_RUN_TASK_KWARGS['overrides']['containerOverrides'][0]['environment'] = [
{'name': 'CUSTOM_ENV_VAR', 'value': 'enviornment variable value'}
]
"I need more levers!!! I should be able to make changes to how the API gets called at runtime!"
class CustomFargateExecutor(AwsFargateExecutor):
def _run_task_kwargs(self, task_id, cmd, queue, exec_config) -> dict:
run_task_api = super()._run_task_kwargs(task_id, cmd, queue, exec_config)
if queue == 'long_tasks_queue':
run_task_api['retryStrategy'] = {'attempts': 3}
run_task_api['timeout'] = {'attemptDurationSeconds': 24 * 60 * 60 * 60}
return run_task_api
Please file a ticket in GitHub for issues. Be persistent and be polite.
This repository uses Github Actions for CI, pytest for Integration/Unit tests, and isort+pylint for code-style. Pythonic Type-Hinting is encouraged. From the bottom of my heart, thank you to everyone who has contributed to making Airflow better.