kubeflow / mpi-operator

Kubernetes Operator for MPI-based applications (distributed training, HPC, etc.)
https://www.kubeflow.org/docs/components/training/mpi/
Apache License 2.0
419 stars 210 forks source link

MPIJobs with Kubernetes Python SDK #582

Closed AymenFJA closed 11 months ago

AymenFJA commented 11 months ago

Dear, all,

I hope you can help me with part:

I am trying to submit MPIJobs to a Kubernetes cluster using the Kubernetes python client.

So far, I could not find documentation explaining how to do so. Can you please guide me on how to do that?

FYI: I tried the following:

client.CustomObjectsApi().create_namespaced_custom_object('kubeflow.org','v2beta1', 'mpijob', 'default', jobs[0])

yet Kubernetes complains about it:

ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'f24fa43b-e92f-4a3c-94cb-35d1911e3f9c', 'Cache-Control': 'no-cache, private', 'Content-Type': 'text/plain; charset=utf-8', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '80bb94bb-70e1-4cd5-b057-69d6a21aeca9', 'X-Kubernetes-Pf-Prioritylevel-Uid': '9371364a-2f5b-44ac-b0b7-edf497140903', 'Date': 'Tue, 18 Jul 2023 21:14:47 GMT', 'Content-Length': '19'})
HTTP response body: 404 page not found

example of my jobs[0]:

 {'apiVersion': 'kubeflow.org/v2beta1',
  'kind': 'MPIJob',
  'metadata': {'name': mpi-ctask',
   'labels': {'kueue.x-k8s.io/queue-name': 'user-queue'}},
  'spec': {'slotsPerWorker': 6,
   'runPolicy': {'cleanPodPolicy': 'Running'},
   'sshAuthMountPath': '/home/mpiuser/.ssh',
   'mpiReplicaSpecs': {'Launcher': {'replicas': 1,
     'restartPolicy': 'OnFailure',
     'template': {'spec': {'containers': [{'image': 'task-mpi',
         'name': 'ctask-7',
         'securityContext': {'runAsUser': 1000},
         'command': ['mpirun'],
         'args': ['--allow-run-as-root',
          '-np',
          '12',
          '-x',
          'LD_LIBRARY_PATH',
          '/ENV/bin/python3',
          '/concat.py'],
         'resources': {'limits': {'cpu': 1, 'memory': '1Gi'}}}]}}},
    'Worker': {'replicas': 2,
     'template': {'spec': {'containers': [{'image': 'task-mpi',
         'name': 'worker',
         'securityContext': {'runAsUser': 1000},
         'command': ['/usr/sbin/sshd'],
         'args': ['-De', '-f', '/home/mpiuser/.sshd_config'],
         'resources': {'requests': {'cpu': 6, 'memory': '2Gi'},
          'limits': {'cpu': 6, 'memory': '3Gi'}}}]}}}}}},

Thanks.

andreyvelich commented 11 months ago

@AymenFJA Did you check Kubeflow Training Operator Python SDK ? You can use create_mpijob API to create your MPI Job.

Unfortunately, we support only v1 MPI Job version in our SDK models not v1beta2, but you still can submit your job via create_mpijob using dictionary as you defined in job[0].

AymenFJA commented 11 months ago

@andreyvelich, thanks for your response. I was not aware of the Kubeflow Python SDK until now. Allow me to understand a few things from your comment:

Kubeflow python SDK does not support V1beta2 yet. So in principle, I can not create a V1beta2 job via Kubeflow Python SDK, but I can submit it to the cluster. Now if my interpretation is correct, I did the following:

from Kubeflow import training
mpi_client = training.TrainingClient(kube_config)
mpi_client.create_mpijob(pod_dict) # I used the same dictionary object as above (jobs[0])

and I got the following error:

File ~/ve/raptor/lib/python3.8/site-packages/kubeflow/training/utils/utils.py:94, in create_job(custom_api, job, namespace, job_kind, job_plural)
     89     raise TimeoutError(
     90         f"Timeout to create {job_kind}: {namespace}/{job.metadata.name}"
     91     )
     92 except Exception:
     93     raise RuntimeError(
---> 94         f"Failed to create {job_kind}: {namespace}/{job.metadata.name}"
     95     )
     97 logging.info(f"{job_kind} {namespace}/{job.metadata.name} has been created")

AttributeError: 'dict' object has no attribute 'metadata'

So I do not think create_mpi_job takes a dictionary object.

What I did next was convert my jobs[0] dict to an object something like this:

class Struct:
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            if isinstance(value, dict):
                self.__dict__[key] = Struct(**value)
            else:
                self.__dict__[key] = value

pod = Struct(**jobs[0])

mpi_client.create_mpijob(pod)

I get another error:

File ~/ve/raptor/lib/python3.8/site-packages/kubernetes/client/api_client.py:238, in ApiClient.sanitize_for_serialization(self, obj)
    231 else:
    232     # Convert model obj to dict except
    233     # attributes `openapi_types`, `attribute_map`
    234     # and attributes which value is not None.
    235     # Convert attribute name to json key in
    236     # model definition for request.
    237     obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
--> 238                 for attr, _ in six.iteritems(obj.openapi_types)
    239                 if getattr(obj, attr) is not None}
    241 return {key: self.sanitize_for_serialization(val)
    242         for key, val in six.iteritems(obj_dict)}

AttributeError: 'Struct' object has no attribute 'openapi_types'

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
Input In [21], in <cell line: 1>()
----> 1 mpi_client.create_mpijob(pod_obj)

File ~/ve/raptor/lib/python3.8/site-packages/kubeflow/training/api/training_client.py:1377, in TrainingClient.create_mpijob(self, mpijob, namespace)
   1361 def create_mpijob(
   1362     self,
   1363     mpijob: models.KubeflowOrgV1MPIJob,
   1364     namespace=utils.get_default_target_namespace(),
   1365 ):
   1366     """Create the MPIJob.
   1367
   1368     Args:
   (...)
   1374         RuntimeError: Failed to create MPIJob.
   1375     """
-> 1377     utils.create_job(
   1378         custom_api=self.custom_api,
   1379         job=mpijob,
   1380         namespace=namespace,
   1381         job_kind=constants.MPIJOB_KIND,
   1382         job_plural=constants.MPIJOB_PLURAL,
   1383     )

File ~/ve/raptor/lib/python3.8/site-packages/kubeflow/training/utils/utils.py:93, in create_job(custom_api, job, namespace, job_kind, job_plural)
     89     raise TimeoutError(
     90         f"Timeout to create {job_kind}: {namespace}/{job.metadata.name}"
     91     )
     92 except Exception:
---> 93     raise RuntimeError(
     94         f"Failed to create {job_kind}: {namespace}/{job.metadata.name}"
     95     )
     97 logging.info(f"{job_kind} {namespace}/{job.metadata.name} has been created")
RuntimeError: Failed to create MPIJob: default/mpi-ctask

Am I missing something or doing something wrong? Thanks.

andreyvelich commented 11 months ago

Oh, I think you are right. When we create Training Job, we automatically add v1 version to create a CR.

In that case SDK might not be useful for you. Then you should, use create_namespaced_custom_object directly from Kubernetes Python client.

I think the original problem was that arguments are incorrect, it should be like this:

client.CustomObjectsApi().create_namespaced_custom_object('kubeflow.org','v2beta1', 'default', `mpijobs`, jobs[0])
AymenFJA commented 11 months ago

@andreyvelich Thank you so much. That worked (mpijob --> mpijobs silly me), closing now.