aelzeiny / airflow-aws-executors

Airflow Executor for both AWS ECS & AWS Fargate
MIT License
53 stars 9 forks source link

Multiple queue support? #8

Closed leonsmith closed 3 years ago

leonsmith commented 3 years ago

Looks like this only has support for a single queue (BatchExecutor) ? Any objections to tweaking it so multiple queues can be supported?

If not I can work on this tomorrow and hopefully get a quick proof of concept

leonsmith commented 3 years ago

So it looks like the BaseExecutor wont pass the queue in trigger_tasks which is a bit weird given that the interface should look like the following function:

    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: Optional[str] = None,
        executor_config: Optional[Any] = None,
    ) -> None:  # pragma: no cover

We can override the trigger_tasks to pass in the queue (like celery executor does)

But this also means we always get a queue argument with the value set to the celery.default_queue config option.

I think given that queue is exposed at the operator level and looks to be an airflow feature (and not specific to celery) the config option living under celery is wrong and should be in the operator config section which would allow other executors like this to re-use it.

leonsmith commented 3 years ago

Opened this tweak upstream in airflow

https://github.com/apache/airflow/issues/14696

aelzeiny commented 3 years ago

I’ll get back to you tonight about this. Thanks for your patience

On Wed, Mar 10, 2021 at 8:16 AM Leon Smith notifications@github.com wrote:

So it looks by default the BaseExecutor wont pass the queue in trigger_tasks which is a bit weird given that they want the execute_async to look like the following function:

def execute_async(
    self,
    key: TaskInstanceKey,
    command: CommandType,
    queue: Optional[str] = None,
    executor_config: Optional[Any] = None,
) -> None:  # pragma: no cover

We can override the trigger_tasks to pass in the queue like celery executor does (I have already done this in this MR)

But this also means we always get a queue argument with the value set to the celery.default_queue config option.

I think given that queue is exposed at the operator level the config option living under cleery is wrong and should be at the operator level which would allow other executors like this to re-use it.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/aelzeiny/airflow-aws-executors/issues/8#issuecomment-795674499, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADKDGKG3I44AQKMWE7ZZ7VDTC6LNFANCNFSM4Y4LQZWQ .

-- Sincerely, Ahmed Elzeiny

aelzeiny commented 3 years ago

Sorry, I didn't get to this yesterday, but I promise I will soon. Thank you again for your patience.

leonsmith commented 3 years ago

Closing in favour of #10