aelzeiny / airflow-aws-executors

Airflow Executor for both AWS ECS & AWS Fargate
MIT License
53 stars 9 forks source link

WIP: Add in the ability to use multiple queues in AwsBatchExecutor #9

Closed leonsmith closed 3 years ago

leonsmith commented 3 years ago

RE: https://github.com/aelzeiny/airflow-aws-executors/issues/8

Bit of a work in progress but shows the small tweak we need to be able to send to multiple Batch job queues.

aelzeiny commented 3 years ago

First off, I didn't realize that it was going to be this complicated from the offset. I think part of the reason is that queues are just purely a Celery concept in the current version of Airflow, and even if they fix it it'll still be a problem in older versions of Airflow (less-than-v2). If we decide to support queues the code is going to look much less elegant with conditionals on versions; which I'm not strongly opposed to -- form follows function.

I see that in Airflow 2.0 that queue is not being passed in; which is strange because that was not an issue in Airflow v1.14 and lower. Here's a screenshot from airflow v1.10.5

aelzeiny commented 3 years ago

Or, hear me out, I think the larger issue here is that this plugin is extremely rigid with its configurability. I allow you to override containerOverrides but not jobQueue. What's the difference? Why does this plugin even care what you override? If a programmer wants to do something, well-designed code should just move out of the way.

So basically what's wrong with this? _submit_job will always call _submit_job_kwargs, and the programmer decides what the structure looks like. Infinite flexibility to do whatever you want when you implement your plugin?

class CustomBatchExecutor(AwsBatchExecutor):
    def _submit_job_kwargs(self, task_id: TaskInstanceKeyType, cmd: CommandType, queue: str, exec_config: ExecutorConfigType) -> dict:
        kwargs = super()._submit_job_kwargs(task_id, cmd, queue, exec_config)
        job_queue = kwargs.pop('job_queue', None) or kwargs.pop('jobQueue', None)
        if job_queue:
            kwargs['jobQueue'] = job_queue
        return kwargs
leonsmith commented 3 years ago

Cheers for the comments & your thoughts! I didn't realise you was targeting 1.0 still.

More than happy to go with allowing submit_job_kwargs to be overridden!

aelzeiny commented 3 years ago

Cool, I have a PR up, but the newest version of airflow requires a certain version of sql-alchemy, and Travis CI is fighting me on this one. I'll get it done though

leonsmith commented 3 years ago

Closing in favour of https://github.com/aelzeiny/airflow-aws-executors/pull/10