galaxyproject / pulsar

Distributed job execution application built for Galaxy
https://pulsar.readthedocs.io
Apache License 2.0
37 stars 50 forks source link

Support Container Scheduling with Azure Batch #335

Open jmchilton opened 1 year ago

jmchilton commented 1 year ago

It looks like the Azure API supports preparation and completion tasks beside container tasks.

This should mean setting up task dependency between pulsar and tool containers (biocontainers not containing pulsar) code can mimic TES support pretty directly. Documentation for how this works is available on some level:

Diagrams without and with MQ available:


The TES runner was added in https://github.com/galaxyproject/pulsar/pull/302 and a similar pattern should work for Azure. Most of the relevant code is in client.py - for instance https://github.com/galaxyproject/pulsar/blob/master/pulsar/client/client.py#L687.

I think the idea would be implementing a

class LaunchesAzureContainersMixin(CoexecutionLaunchMixin):

that mirrors LaunchesTesContainersMixin(CoexecutionLaunchMixin).

And then mirror the TES job clients:

class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin):
    """A client that co-executes pods via GA4GH TES and depends on amqp for status updates."""

    def __init__(self, destination_params, job_id, client_manager):
        super().__init__(destination_params, job_id, client_manager)
        self._setup_tes_client_properties(destination_params)

class TesMessageCoexecutionJobClient(BaseMessageCoexecutionJobClient, LaunchesTesContainersMixin):
    """A client that co-executes pods via GA4GH TES and doesn't depend on amqp for status updates."""

    def __init__(self, destination_params, job_id, client_manager):
        super().__init__(destination_params, job_id, client_manager)
        self._setup_tes_client_properties(destination_params)

But setting up relevant azure properties.

After that is setup - I think build_client_manager in client_manager.py would need to dispatch on some relevant Azure connection properties to realize that PollingJobClientManager should be used:

def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface:
    if 'job_manager' in kwargs:
        return ClientManager(**kwargs)  # TODO: Consider more separation here.
    elif kwargs.get('amqp_url', None):
        return MessageQueueClientManager(**kwargs)
    elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"):
        return PollingJobClientManager(**kwargs)
    else:
        return ClientManager(**kwargs)

MessageQueueClientManager and PollingJobClientManager would need to be updated to dispatch on these and produce the relevant clients.

That is all that should be strictly needed - but mirroring K8S and TES with connivence runners optimized for this configuration and test setups in Galaxy would be wonderful. See https://github.com/galaxyproject/galaxy/pull/14777/files for how to do this.