nextflow-io / nextflow

A DSL for data-driven computational pipelines
http://nextflow.io
Apache License 2.0
2.61k stars 605 forks source link

Support for MPI jobs #3313

Open xhejtman opened 1 year ago

xhejtman commented 1 year ago

Hello,

we would like to run MPI jobs as processes from nextflow via kuberun. It means that at least for MPI jobs, there is a need for service object so that it is able to communicate.

Would you be opened for extending kuberun for MPI jobs?

pditommaso commented 1 year ago

Support for MPI I think means co-located jobs but I don't think it's should a kuberun specific ability.

It should be supported by the k8s executor in general.

xhejtman commented 1 year ago

Yes, could be supported by k8s executor. Probably simple solution is to utilize mpioperator (from kubeflow) and just spawn mpijob kind instead of job. Operator will do the rest. What do you think?

pditommaso commented 1 year ago

This sounds interesting. it should be assessed the impact of using mpijob instead of a pod or a job.

Also, would not it require some extra directive for the job? how does the mpioperator determine the jobs to be executed in the same node?

xhejtman commented 1 year ago

I do not mean mpijob should be used in general in all cases, just for the mpijob.

This is example of mpijob definition:

kind: MPIJob
metadata:
  name: mpijob
spec:
  slotsPerWorker: 1
  runPolicy:
    cleanPodPolicy: All
    ttlSecondsAfterFinished: 60
  sshAuthMountPath: /home/user/.ssh
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - image: IMAGE
            name: launcher
            command:
            - mpirun
            args:
            - -n
            - "2"
            - COMMAND
            securityContext:
              runAsUser: 1000
            resources:
              limits:
                cpu: 500m
                memory: 500Mi
    Worker:
      replicas: 2
      template:
        spec:
          containers:
          - image: IMAGE
            name: worker
            securityContext:
              runAsUser: 1000
            resources:
              limits:
                cpu: 2200m
                memory: 64Gi

The operator creates service mesh and pods. It is expecting sshd is installed inside the container (as mpirun is based on ssh).

pditommaso commented 1 year ago

I do not mean mpijob should be used in general in all cases, just for the mpijob

Understand that. But mpirun usually requires other parameters like number of nodes and jobs per node? How you are planning to provide that via nextflow ?

xhejtman commented 1 year ago

I think the operator is a bit simple and only allows to specify number of workers (not specifying layout of workers to nodes), so I believe, one extra parameter would be needed only - number of workers.

So, similarly to accelerator parameter, I would define mpiworkers parameter and if set, then spawn mpijob with this number of workers.

bentsherman commented 1 year ago

I think we can make this work with some additional pod options:

process foo {
    pod [
        computeResourceType: 'MPIJob',
        mpiWorkers: 2
    ]

    "COMMAND"
}

We can extend the computeResourceType enum we already use for jobs vs pods. We can assume default of 1 for slots per worker and launcher replicas or expose them as pod options as well.

The main caveats are:

pditommaso commented 1 year ago

@xhejtman should the mpijob be defined at the process level? currenty the computeResourceType is a top level k8s config

https://github.com/nextflow-io/nextflow/blob/master/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy#L165-L165

Also, other mpiWorkers aka replicas another important piece of information should be slotsPerWorker that I guess represents the number of tasks per node.

xhejtman commented 1 year ago

Process level, not all processes need to be mpijobs, I am afraid.

bentsherman commented 1 year ago

It's not clear to me what slots per worker means, because you can also specify the cpu request/limit for each worker, so you could use that to control the worker-to-node topology.

This article provides some explanation: https://cloud.redhat.com/blog/how-to-use-kubeflow-and-the-mpi-operator-on-openshift

xhejtman commented 1 year ago

I think it is about the fact, that mpirun can specify both number of nodes and number of workers per node. So it is not a single number.

bentsherman commented 1 year ago

The article suggests that slotsPerWorker is the number of MPI processes per worker, which means that it doesn't affect pod-to-node placement. It seems to me, the simplest approach would be to always set slotsPerWorker to 1 and allow the user to control the topology through the cpu/memory requests.

On the other hand, I wonder if slotsPerWorker is facilitating the local groups. Normally, MPI processes on the same node form a "local group" that can communicate with shared memory instead of message passing. So it may be that "worker" in the mpi operator refers to a worker node rather than a worker process.

xhejtman commented 1 year ago

So, can we implement this as it seems we agree to use mpi operator and its resources.

pditommaso commented 1 year ago

This looks valuable, however, it's a tradeoff on how much it will impact the current implementation. Feel free to provide a draft implementation to better evaluate it.

xhejtman commented 1 year ago

https://github.com/nextflow-io/nextflow/pull/3392

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 6 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.