ejjordan / airflowHPC

2 stars 1 forks source link

Rework `SlurmHook` #39

Open ejjordan opened 1 week ago

ejjordan commented 1 week ago

Currently, the SlurmHook collaborates with the ResourceExecutor and the resource operators ResourceBashOperator and ResourceGmxOperator to manage the cores and GPUs assigned to different tasks. The current structure is that the SlurmHook in the resource executor keeps track of which cores/GPUs are busy or free, and uses environment variables to communicate with the resource operators which cores and GPUs to use. This sometimes leads to the problem that new tasks are assigned their specific cores/GPUs before previously completed tasks have marked their resources as released. The executor heartbeat method calls the sync method, which actually marks resources as no longer in use. This executor heartbeat is called periodically, while the task scheduler actually polls the DB continuously see if tasks are complete and thus new tasks can be launched, typically launching new tasks within 0.01 seconds of upstream tasks being completed. Thus the SlurmHook is not working as intended at managing resource usage.

The solution to this issue is that the ResourceExecutor and the resource operators need to both be rewritten to reflect the intended functionality of airflow Hooks, as outlined here https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#hooks

Basically the operators need to be rewritten such that their execute methods call the appropriate methods of the slurm hook, something like the following.

class ResourceGmxOperator(BaseOperator):
    def __init__(self, ..., conn_id: str, database: str, **kwargs) -> None:
        # existing setup stuff
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.database = database

    def execute(self, context):
        hook = SlurmHook(conn_id=self.conn_id, schema=self.database)
        self.core_ids, self.gpu_ids = hook.assign_resources(self.executor_config)

        # the actual execution of the task specified by the operator

        hook.release_resources(self.core_ids, self.gpu_ids)

        return output_files_paths

In order for this to work, the slurm hook needs to be given a DB backend with a schema for keeping track of total available resources (node names, core/GPU ids, etc), as well as which resources are free or occupied.

This change may need to be coordinated with changes to the ResourceExecutor heartbeat method so that tasks will only be added to the task_queue once resources are available and all upstream tasks are done. Indeed, a simple workaround might be to just make sure that the heartbeat only adds tasks to the task_queue once all upstream tasks are completed, as this might ensure that resources from previously completed tasks are definitely freed.

ejjordan commented 1 week ago

Indeed, a simple workaround might be to just make sure that the heartbeat only adds tasks to the task_queue once all upstream tasks are completed, as this might ensure that resources from previously completed tasks are definitely freed.

This does not seem to be possible as getting the TaskInstance's via, e.g.

    @provide_session
    def get_tis(self, ti_keys: list[TaskInstanceKey], session: Session = NEW_SESSION):
        tis: list[TaskInstance] = []
        for key in ti_keys:
            query = select(TaskInstance).where(
            TaskInstance.dag_id == key.dag_id,
            TaskInstance.task_id == key.task_id,
            TaskInstance.run_id == key.run_id,
            TaskInstance.map_index == key.map_index,
            )
            ti = session.scalars(query).one()
            tis.append(ti)
        return tis

does not result in TI's with the task attribute populated. IIRC, to get a TI with TI.task not None you have to get the TIs from the dagrun and even this only works with the scheduler job runner and not the backfill job runner.