sungsoo / sungsoo.github.io

Sung-Soo Kim's Blog
30 stars 8 forks source link

feat: Python SDK for Kubeflow Training Operator #17

Open sungsoo opened 2 years ago

sungsoo commented 2 years ago

Python SDK for Kubeflow Training Operator

Training Operator provides Python SDK for the custom resources. More docs are available in sdk/python folder.

Use pip install command to install the latest release of the SDK:

pip install kubeflow-training

Examples

First, we need to download the examples of Python SDK for Kubeflow Training Operator.

(base) ╭─sungsoo@lavender ~/cdev/20-ml-concepts
╰─$ git clone https://github.com/kubeflow/training-operator.git

cd training-operator
cd sdk/python
python setup.py install --user

For testing, go to the mnist example folder.

cd examples
cd pytorch
cd mnist
sungsoo commented 2 years ago

PyTorchJobClient

PyTorchJobClient(config_file=None, context=None, client_configuration=None, persist_config=True)

User can loads authentication and cluster information from kube-config file and stores them in kubernetes.client.configuration. Parameters are as following:

parameter Description
config_file Name of the kube-config file. Defaults to ~/.kube/config. Note that for the case that the SDK is running in cluster and you want to operate PyTorchJob in another remote cluster, user must set config_file to load kube-config file explicitly, e.g. PyTorchJobClient(config_file="~/.kube/config").
context Set the active context. If is set to None, current_context from config file will be used.
client_configuration The kubernetes.client.Configuration to set configs to.
persist_config If True, config file will be updated when changed (e.g GCP token refresh).

The APIs for PyTorchJobClient are as following:

Class Method Description
PyTorchJobClient create Create PyTorchJob
PyTorchJobClient get Get the specified PyTorchJob or all PyTorchJob in the namespace
PyTorchJobClient patch Patch the specified PyTorchJob
PyTorchJobClient delete Delete the specified PyTorchJob
PyTorchJobClient wait_for_job Wait for the specified job to finish
PyTorchJobClient wait_for_condition Waits until any of the specified conditions occur
PyTorchJobClient get_job_status Get the PyTorchJob status
PyTorchJobClient is_job_running Check if the PyTorchJob running
PyTorchJobClient is_job_succeeded Check if the PyTorchJob Succeeded
PyTorchJobClient get_pod_names Get pod names of PyTorchJob
PyTorchJobClient get_logs Get training logs of the PyTorchJob

create

create(pytorchjob, namespace=None)

Create the provided pytorchjob in the specified namespace

Example

from kubernetes.client import V1PodTemplateSpec
from kubernetes.client import V1ObjectMeta
from kubernetes.client import V1PodSpec
from kubernetes.client import V1Container
from kubernetes.client import V1ResourceRequirements

from kubeflow.training import constants
from kubeflow.training import utils
from kubeflow.training import V1ReplicaSpec
from kubeflow.training import KubeflowOrgV1PyTorchJob
from kubeflow.training import KubeflowOrgV1PyTorchJobSpec
from kubeflow.training import PyTorchJobClient

  container = V1Container(
    name="pytorch",
    image="gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0",
    args=["--backend","gloo"],
  )

  master = V1ReplicaSpec(
    replicas=1,
    restart_policy="OnFailure",
    template=V1PodTemplateSpec(
      spec=V1PodSpec(
        containers=[container]
      )
    )
  )

  worker = V1ReplicaSpec(
    replicas=1,
    restart_policy="OnFailure",
    template=V1PodTemplateSpec(
      spec=V1PodSpec(
        containers=[container]
        )
    )
  )

  pytorchjob = KubeflowOrgV1PyTorchJob(
    api_version="kubeflow.org/v1",
    kind="PyTorchJob",
    metadata=V1ObjectMeta(name="mnist", namespace='default'),
    spec=KubeflowOrgV1PyTorchJobSpec(
      clean_pod_policy="None",
      pytorch_replica_specs={"Master": master,
                             "Worker": worker}
    )
  )

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.create(pytorchjob)

Parameters

Name Type Description Notes
pytorchjob KubeflowOrgV1PyTorchJob pytorchjob defination Required
namespace str Namespace for pytorchjob deploying to. If the namespace is not defined, will align with pytorchjob definition, or use current or default namespace if namespace is not specified in pytorchjob definition. Optional

Return type

object

get

get(name=None, namespace=None, watch=False, timeout_seconds=600)

Get the created pytorchjob in the specified namespace

Example

from kubeflow.training import pytorchjobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.get('mnist', namespace='kubeflow')

Parameters

Name Type Description Notes
name str pytorchjob name. If the name is not specified, it will get all pytorchjobs in the namespace. Optional.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional
watch bool Watch the created pytorchjob if True, otherwise will return the created pytorchjob object. Stop watching if pytorchjob reaches the optional specified timeout_seconds or once the PyTorchJob status Succeeded or Failed. Optional
timeout_seconds int Timeout seconds for watching. Defaults to 600. Optional

Return type

object

patch

patch(name, pytorchjob, namespace=None)

Patch the created pytorchjob in the specified namespace.

Note that if you want to set the field from existing value to None, patch API may not work, you need to use replace API to remove the field value.

Example


pytorchjob = KubeflowOrgV1PyTorchJob(
    api_version="kubeflow.org/v1",
    ... #update something in PyTorchJob spec
)

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.patch('mnist', isvc)

Parameters

Name Type Description Notes
pytorchjob KubeflowOrgV1PyTorchJob pytorchjob defination Required
namespace str The pytorchjob's namespace for patching. If the namespace is not defined, will align with pytorchjob definition, or use current or default namespace if namespace is not specified in pytorchjob definition. Optional

Return type

object

delete

delete(name, namespace=None)

Delete the created pytorchjob in the specified namespace

Example

from kubeflow.training import pytorchjobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.delete('mnist', namespace='kubeflow')

Parameters

Name Type Description Notes
name str pytorchjob name
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional

Return type

object

wait_for_job

wait_for_job(name, namespace=None, watch=False, timeout_seconds=600, polling_interval=30, status_callback=None):

Wait for the specified job to finish.

Example

from kubeflow.training import PyTorchJobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.wait_for_job('mnist', namespace='kubeflow')

# The API also supports watching the PyTorchJob status till it's Succeeded or Failed.
pytorchjob_client.wait_for_job('mnist', namespace='kubeflow', watch=True)
NAME                           STATE                TIME
pytorch-dist-mnist-gloo        Created              2020-01-02T09:21:22Z
pytorch-dist-mnist-gloo        Running              2020-01-02T09:21:36Z
pytorch-dist-mnist-gloo        Running              2020-01-02T09:21:36Z
pytorch-dist-mnist-gloo        Running              2020-01-02T09:21:36Z
pytorch-dist-mnist-gloo        Running              2020-01-02T09:21:36Z
pytorch-dist-mnist-gloo        Succeeded            2020-01-02T09:26:38Z

Parameters

Name Type Description Notes
name str The PyTorchJob name.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional
watch bool Watch the PyTorchJob if True. Stop watching if PyTorchJob reaches the optional specified timeout_seconds or once the PyTorchJob status Succeeded or Failed. Optional
timeout_seconds int How long to wait for the job, default wait for 600 seconds. Optional
polling_interval int How often to poll for the status of the job. Optional
status_callback str Callable. If supplied this callable is invoked after we poll the job. Callable takes a single argument which is the pytorchjob. Optional

Return type

object

wait_for_condition

wait_for_condition(name, expected_condition, namespace=None, timeout_seconds=600, polling_interval=30, status_callback=None):

Waits until any of the specified conditions occur.

Example

from kubeflow.training import PyTorchJobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.wait_for_condition('mnist', expected_condition=["Succeeded", "Failed"], namespace='kubeflow')

Parameters

Name Type Description Notes
name str The PyTorchJob name.
expected_condition List A list of conditions. Function waits until any of the supplied conditions is reached.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional
timeout_seconds int How long to wait for the job, default wait for 600 seconds. Optional
polling_interval int How often to poll for the status of the job. Optional
status_callback str Callable. If supplied this callable is invoked after we poll the job. Callable takes a single argument which is the pytorchjob. Optional

Return type

object

get_job_status

get_job_status(name, namespace=None)

Returns PyTorchJob status, such as Running, Failed or Succeeded.

Example

from kubeflow.training import PyTorchJobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.get_job_status('mnist', namespace='kubeflow')

Parameters

Name Type Description Notes
name str The PyTorchJob name.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional

Return type

Str

is_job_running

is_job_running(name, namespace=None)

Returns True if the PyTorchJob running; false otherwise.

Example

from kubeflow.training import PyTorchJobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.is_job_running('mnist', namespace='kubeflow')

Parameters

Name Type Description Notes
name str The PyTorchJob name.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional

Return type

Bool

is_job_succeeded

is_job_succeeded(name, namespace=None)

Returns True if the PyTorchJob succeeded; false otherwise.

Example

from kubeflow.training import PyTorchJobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.is_job_succeeded('mnist', namespace='kubeflow')

Parameters

Name Type Description Notes
name str The PyTorchJob name.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional

Return type

Bool

get_pod_names

get_pod_names(name, namespace=None, master=False, replica_type=None, replica_index=None)

Get pod names of the PyTorchJob.

Example

from kubeflow.training import PyTorchJobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.get_pod_names('mnist', namespace='kubeflow')

Parameters

Name Type Description Notes
name str The PyTorchJob name.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional
master bool Only get pod with label 'job-role: master' pod if True.
replica_type str User can specify one of 'master, worker' to only get one type pods. By default get all type pods.
replica_index str User can specfy replica index to get one pod of the PyTorchJob.

Return type

Set

get_logs

get_logs(name, namespace=None, master=True, replica_type=None, replica_index=None, follow=False)

Get training logs of the PyTorchJob. By default only get the logs of Pod that has labels 'job-role: master', to get all pods logs, specfy the master=False.

Example

from kubeflow.training import PyTorchJobClient

pytorchjob_client = PyTorchJobClient()
pytorchjob_client.get_logs('mnist', namespace='kubeflow')

Parameters

Name Type Description Notes
name str The PyTorchJob name.
namespace str The pytorchjob's namespace. Defaults to current or default namespace. Optional
master bool Only get pod with label 'job-role: master' pod if True.
replica_type str User can specify one of 'master, worker' to only get one type pods. By default get all type pods.
replica_index str User can specfy replica index to get one pod of the PyTorchJob.
follow bool Follow the log stream of the pod. Defaults to false.

Return type

Str

sungsoo commented 2 years ago

Test result for PyTorchJob using Python SDK for Kubeflow

# Install Kubeflow training package for Python
!pip install kubeflow-training
Collecting kubeflow-training
  Downloading kubeflow-training-1.4.0.tar.gz (50 kB)
     |████████████████████████████████| 50 kB 5.3 MB/s eta 0:00:011
[?25hRequirement already satisfied: certifi>=14.05.14 in /opt/conda/lib/python3.8/site-packages (from kubeflow-training) (2021.5.30)
Requirement already satisfied: six>=1.10 in /opt/conda/lib/python3.8/site-packages (from kubeflow-training) (1.16.0)
Requirement already satisfied: python_dateutil>=2.5.3 in /opt/conda/lib/python3.8/site-packages (from kubeflow-training) (2.8.1)
Requirement already satisfied: setuptools>=21.0.0 in /opt/conda/lib/python3.8/site-packages (from kubeflow-training) (49.6.0.post20210108)
Requirement already satisfied: urllib3>=1.15.1 in /opt/conda/lib/python3.8/site-packages (from kubeflow-training) (1.26.5)
Requirement already satisfied: kubernetes>=12.0.0 in /opt/conda/lib/python3.8/site-packages (from kubeflow-training) (12.0.1)
Requirement already satisfied: table_logger>=0.3.5 in /opt/conda/lib/python3.8/site-packages (from kubeflow-training) (0.3.6)
Collecting retrying>=1.3.3
  Downloading retrying-1.3.3.tar.gz (10 kB)
Requirement already satisfied: google-auth>=1.0.1 in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-training) (1.35.0)
Requirement already satisfied: requests in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-training) (2.25.1)
Requirement already satisfied: requests-oauthlib in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-training) (1.3.0)
Requirement already satisfied: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-training) (1.0.1)
Requirement already satisfied: pyyaml>=3.12 in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-training) (5.4.1)
Requirement already satisfied: rsa<5,>=3.1.4 in /opt/conda/lib/python3.8/site-packages (from google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-training) (4.7.2)
Requirement already satisfied: cachetools<5.0,>=2.0.0 in /opt/conda/lib/python3.8/site-packages (from google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-training) (4.2.4)
Requirement already satisfied: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.8/site-packages (from google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-training) (0.2.8)
Requirement already satisfied: pyasn1<0.5.0,>=0.4.6 in /opt/conda/lib/python3.8/site-packages (from pyasn1-modules>=0.2.1->google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-training) (0.4.8)
Requirement already satisfied: numpy in /opt/conda/lib/python3.8/site-packages (from table_logger>=0.3.5->kubeflow-training) (1.20.3)
Requirement already satisfied: idna<3,>=2.5 in /opt/conda/lib/python3.8/site-packages (from requests->kubernetes>=12.0.0->kubeflow-training) (2.10)
Requirement already satisfied: chardet<5,>=3.0.2 in /opt/conda/lib/python3.8/site-packages (from requests->kubernetes>=12.0.0->kubeflow-training) (4.0.0)
Requirement already satisfied: oauthlib>=3.0.0 in /opt/conda/lib/python3.8/site-packages (from requests-oauthlib->kubernetes>=12.0.0->kubeflow-training) (3.1.1)
Building wheels for collected packages: kubeflow-training, retrying
  Building wheel for kubeflow-training (setup.py) ... [?25ldone
[?25h  Created wheel for kubeflow-training: filename=kubeflow_training-1.4.0-py3-none-any.whl size=87393 sha256=60974e76c532c196dc1c4484033fe99adecea90fe56068fa32f5872beb4a22cf
  Stored in directory: /home/jovyan/.cache/pip/wheels/90/45/99/000faf2ce58831b35c0572bbab3dde83c82d72bc11f4e1e3ae
  Building wheel for retrying (setup.py) ... [?25ldone
[?25h  Created wheel for retrying: filename=retrying-1.3.3-py3-none-any.whl size=11429 sha256=6682f185b749b53adb014637c0b2ccb73a4fc16410f7af2b7982d1610b1db2ef
  Stored in directory: /home/jovyan/.cache/pip/wheels/c4/a7/48/0a434133f6d56e878ca511c0e6c38326907c0792f67b476e56
Successfully built kubeflow-training retrying
Installing collected packages: retrying, kubeflow-training
Successfully installed kubeflow-training-1.4.0 retrying-1.3.3
# import python client packages for kubernetes
from kubernetes.client import V1PodTemplateSpec
from kubernetes.client import V1ObjectMeta
from kubernetes.client import V1PodSpec
from kubernetes.client import V1Container
from kubernetes.client import V1ResourceRequirements
# import python client packages for kubeflow
from kubeflow.training import constants
from kubeflow.training.utils import utils
from kubeflow.training import V1ReplicaSpec
from kubeflow.training import V1PyTorchJob
from kubeflow.training import V1PyTorchJobSpec
from kubeflow.training import V1RunPolicy
from kubeflow.training import PyTorchJobClient
namespace = utils.get_current_k8s_namespace()
print('# Current namespace: ', namespace)
# Current namespace:  etri-traindb-ml
# For TrainDB-ML, we need to prepare the following elements and parameters.
# 1. TrainingOperator Type: e.g. name="pytorch"
# 2. Prebuild docker images in image repository (e.g., dockerhub)
#    - RSPN, MDN, Autoregressive Model, HyperSPN, etc.
# 3. Parameters and hyperparameters for training
#    - P#1. Model name
#    - P#2. Database access info. (dbuser/dbpassword)
#    - P#3. Table info. ([optional] schema info.)
#    - P#4. 

# MNIST test container
container = V1Container(
    name="pytorch",
    image="gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0",
    args=["--backend","gloo"]
)
# Master node specification
master = V1ReplicaSpec(
    replicas=1,
    restart_policy="OnFailure",
    template=V1PodTemplateSpec(
        spec=V1PodSpec(
            containers=[container]
        )
    )
)
# Worker node specification
worker = V1ReplicaSpec(
    replicas=1,
    restart_policy="OnFailure",
    template=V1PodTemplateSpec(
        spec=V1PodSpec(
            containers=[container]
        )
    )
)
# PyTorchJob Definition
pytorchjob = V1PyTorchJob(
    api_version="kubeflow.org/v1",
    kind="PyTorchJob",
    metadata=V1ObjectMeta(name="pytorch-dist-mnist-gloo",namespace=namespace),
    spec=V1PyTorchJobSpec(
        run_policy=V1RunPolicy(clean_pod_policy="None"),
        pytorch_replica_specs={"Master": master,
                               "Worker": worker}
    )
)
# Create PyTorchJob Client
pytorchjob_client = PyTorchJobClient()
pytorchjob_client.create(pytorchjob)
{'apiVersion': 'kubeflow.org/v1',
 'kind': 'PyTorchJob',
 'metadata': {'creationTimestamp': '2022-06-20T23:23:36Z',
  'generation': 1,
  'managedFields': [{'apiVersion': 'kubeflow.org/v1',
    'fieldsType': 'FieldsV1',
    'fieldsV1': {'f:spec': {'.': {},
      'f:pytorchReplicaSpecs': {'.': {},
       'f:Master': {'.': {},
        'f:replicas': {},
        'f:restartPolicy': {},
        'f:template': {'.': {}, 'f:spec': {'.': {}, 'f:containers': {}}}},
       'f:Worker': {'.': {},
        'f:replicas': {},
        'f:restartPolicy': {},
        'f:template': {'.': {}, 'f:spec': {'.': {}, 'f:containers': {}}}}},
      'f:runPolicy': {'.': {}, 'f:cleanPodPolicy': {}}}},
    'manager': 'OpenAPI-Generator',
    'operation': 'Update',
    'time': '2022-06-20T23:23:36Z'}],
  'name': 'pytorch-dist-mnist-gloo',
  'namespace': 'etri-traindb-ml',
  'resourceVersion': '12733',
  'selfLink': '/apis/kubeflow.org/v1/namespaces/etri-traindb-ml/pytorchjobs/pytorch-dist-mnist-gloo',
  'uid': 'cd6da473-042b-446f-b34c-932cc7cf318a'},
 'spec': {'pytorchReplicaSpecs': {'Master': {'replicas': 1,
    'restartPolicy': 'OnFailure',
    'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],
        'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',
        'name': 'pytorch'}]}}},
   'Worker': {'replicas': 1,
    'restartPolicy': 'OnFailure',
    'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],
        'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',
        'name': 'pytorch'}]}}}},
  'runPolicy': {'cleanPodPolicy': 'None'}}}
# Get depolyment information for PyTorchJob
pytorchjob_client.get('pytorch-dist-mnist-gloo')
{'apiVersion': 'kubeflow.org/v1',
 'kind': 'PyTorchJob',
 'metadata': {'creationTimestamp': '2022-06-20T23:23:36Z',
  'generation': 1,
  'managedFields': [{'apiVersion': 'kubeflow.org/v1',
    'fieldsType': 'FieldsV1',
    'fieldsV1': {'f:spec': {'.': {},
      'f:pytorchReplicaSpecs': {'.': {},
       'f:Master': {'.': {},
        'f:replicas': {},
        'f:restartPolicy': {},
        'f:template': {'.': {}, 'f:spec': {'.': {}, 'f:containers': {}}}},
       'f:Worker': {'.': {},
        'f:replicas': {},
        'f:restartPolicy': {},
        'f:template': {'.': {}, 'f:spec': {'.': {}, 'f:containers': {}}}}},
      'f:runPolicy': {'.': {}, 'f:cleanPodPolicy': {}}}},
    'manager': 'OpenAPI-Generator',
    'operation': 'Update',
    'time': '2022-06-20T23:23:36Z'},
   {'apiVersion': 'kubeflow.org/v1',
    'fieldsType': 'FieldsV1',
    'fieldsV1': {'f:status': {'.': {},
      'f:conditions': {},
      'f:replicaStatuses': {'.': {}, 'f:Master': {}, 'f:Worker': {}}}},
    'manager': 'manager',
    'operation': 'Update',
    'time': '2022-06-20T23:23:36Z'}],
  'name': 'pytorch-dist-mnist-gloo',
  'namespace': 'etri-traindb-ml',
  'resourceVersion': '12752',
  'selfLink': '/apis/kubeflow.org/v1/namespaces/etri-traindb-ml/pytorchjobs/pytorch-dist-mnist-gloo',
  'uid': 'cd6da473-042b-446f-b34c-932cc7cf318a'},
 'spec': {'pytorchReplicaSpecs': {'Master': {'replicas': 1,
    'restartPolicy': 'OnFailure',
    'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],
        'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',
        'name': 'pytorch'}]}}},
   'Worker': {'replicas': 1,
    'restartPolicy': 'OnFailure',
    'template': {'spec': {'containers': [{'args': ['--backend', 'gloo'],
        'image': 'gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0',
        'name': 'pytorch'}]}}}},
  'runPolicy': {'cleanPodPolicy': 'None'}},
 'status': {'conditions': [{'lastTransitionTime': '2022-06-20T23:23:36Z',
    'lastUpdateTime': '2022-06-20T23:23:36Z',
    'message': 'PyTorchJob pytorch-dist-mnist-gloo is created.',
    'reason': 'PyTorchJobCreated',
    'status': 'True',
    'type': 'Created'},
   {'lastTransitionTime': '2022-06-20T23:23:36Z',
    'lastUpdateTime': '2022-06-20T23:23:36Z',
    'message': 'PyTorchJob pytorch-dist-mnist-gloo is running.',
    'reason': 'JobRunning',
    'status': 'True',
    'type': 'Running'}],
  'replicaStatuses': {'Master': {}, 'Worker': {}}}}
# Querying PyTorchJob Client Status
pytorchjob_client.get_job_status('pytorch-dist-mnist-gloo', namespace=namespace)
'Running'
pytorchjob_client.wait_for_job('pytorch-dist-mnist-gloo', namespace=namespace, watch=True)
NAME                           STATE                TIME                          
pytorch-dist-mnist-gloo        Running              2022-06-20T23:23:36Z          
pytorch-dist-mnist-gloo        Running              2022-06-20T23:23:36Z          
pytorch-dist-mnist-gloo        Running              2022-06-20T23:23:36Z          
pytorch-dist-mnist-gloo        Running              2022-06-20T23:23:36Z          
pytorch-dist-mnist-gloo        Succeeded            2022-06-20T23:28:35Z          
pytorchjob_client.is_job_succeeded('pytorch-dist-mnist-gloo', namespace=namespace)
True
pytorchjob_client.get_logs('pytorch-dist-mnist-gloo', namespace=namespace)
The logs of Pod pytorch-dist-mnist-gloo-master-0:
 Using distributed PyTorch with gloo backend
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Processing...
Done!
Train Epoch: 1 [0/60000 (0%)]   loss=2.3000
Train Epoch: 1 [640/60000 (1%)] loss=2.2135
Train Epoch: 1 [1280/60000 (2%)]    loss=2.1704
Train Epoch: 1 [1920/60000 (3%)]    loss=2.0766
Train Epoch: 1 [2560/60000 (4%)]    loss=1.8679
Train Epoch: 1 [3200/60000 (5%)]    loss=1.4135
Train Epoch: 1 [3840/60000 (6%)]    loss=1.0003
Train Epoch: 1 [4480/60000 (7%)]    loss=0.7763
Train Epoch: 1 [5120/60000 (9%)]    loss=0.4598
Train Epoch: 1 [5760/60000 (10%)]   loss=0.4870
Train Epoch: 1 [6400/60000 (11%)]   loss=0.4381
Train Epoch: 1 [7040/60000 (12%)]   loss=0.4089
Train Epoch: 1 [7680/60000 (13%)]   loss=0.4618
Train Epoch: 1 [8320/60000 (14%)]   loss=0.4284
Train Epoch: 1 [8960/60000 (15%)]   loss=0.3992
Train Epoch: 1 [9600/60000 (16%)]   loss=0.3840
Train Epoch: 1 [10240/60000 (17%)]  loss=0.2981
Train Epoch: 1 [10880/60000 (18%)]  loss=0.5013
Train Epoch: 1 [11520/60000 (19%)]  loss=0.5246
Train Epoch: 1 [12160/60000 (20%)]  loss=0.3376
Train Epoch: 1 [12800/60000 (21%)]  loss=0.3678
Train Epoch: 1 [13440/60000 (22%)]  loss=0.4515
Train Epoch: 1 [14080/60000 (23%)]  loss=0.3043
Train Epoch: 1 [14720/60000 (25%)]  loss=0.3581
Train Epoch: 1 [15360/60000 (26%)]  loss=0.3301
Train Epoch: 1 [16000/60000 (27%)]  loss=0.4392
Train Epoch: 1 [16640/60000 (28%)]  loss=0.3626
Train Epoch: 1 [17280/60000 (29%)]  loss=0.3179
Train Epoch: 1 [17920/60000 (30%)]  loss=0.2013
Train Epoch: 1 [18560/60000 (31%)]  loss=0.5004
Train Epoch: 1 [19200/60000 (32%)]  loss=0.3266
Train Epoch: 1 [19840/60000 (33%)]  loss=0.1194
Train Epoch: 1 [20480/60000 (34%)]  loss=0.1898
Train Epoch: 1 [21120/60000 (35%)]  loss=0.1402
Train Epoch: 1 [21760/60000 (36%)]  loss=0.3160
Train Epoch: 1 [22400/60000 (37%)]  loss=0.1499
Train Epoch: 1 [23040/60000 (38%)]  loss=0.2886
Train Epoch: 1 [23680/60000 (39%)]  loss=0.4685
Train Epoch: 1 [24320/60000 (41%)]  loss=0.2157
Train Epoch: 1 [24960/60000 (42%)]  loss=0.1523
Train Epoch: 1 [25600/60000 (43%)]  loss=0.2244
Train Epoch: 1 [26240/60000 (44%)]  loss=0.2634
Train Epoch: 1 [26880/60000 (45%)]  loss=0.2334
Train Epoch: 1 [27520/60000 (46%)]  loss=0.2633
Train Epoch: 1 [28160/60000 (47%)]  loss=0.2123
Train Epoch: 1 [28800/60000 (48%)]  loss=0.1336
Train Epoch: 1 [29440/60000 (49%)]  loss=0.2771
Train Epoch: 1 [30080/60000 (50%)]  loss=0.0937
Train Epoch: 1 [30720/60000 (51%)]  loss=0.1277
Train Epoch: 1 [31360/60000 (52%)]  loss=0.2461
Train Epoch: 1 [32000/60000 (53%)]  loss=0.3392
Train Epoch: 1 [32640/60000 (54%)]  loss=0.1526
Train Epoch: 1 [33280/60000 (55%)]  loss=0.0911
Train Epoch: 1 [33920/60000 (57%)]  loss=0.1446
Train Epoch: 1 [34560/60000 (58%)]  loss=0.1983
Train Epoch: 1 [35200/60000 (59%)]  loss=0.2193
Train Epoch: 1 [35840/60000 (60%)]  loss=0.0628
Train Epoch: 1 [36480/60000 (61%)]  loss=0.1357
Train Epoch: 1 [37120/60000 (62%)]  loss=0.1162
Train Epoch: 1 [37760/60000 (63%)]  loss=0.2369
Train Epoch: 1 [38400/60000 (64%)]  loss=0.0642
Train Epoch: 1 [39040/60000 (65%)]  loss=0.1068
Train Epoch: 1 [39680/60000 (66%)]  loss=0.1601
Train Epoch: 1 [40320/60000 (67%)]  loss=0.1090
Train Epoch: 1 [40960/60000 (68%)]  loss=0.1768
Train Epoch: 1 [41600/60000 (69%)]  loss=0.2297
Train Epoch: 1 [42240/60000 (70%)]  loss=0.0752
Train Epoch: 1 [42880/60000 (71%)]  loss=0.1564
Train Epoch: 1 [43520/60000 (72%)]  loss=0.2789
Train Epoch: 1 [44160/60000 (74%)]  loss=0.1432
Train Epoch: 1 [44800/60000 (75%)]  loss=0.1151
Train Epoch: 1 [45440/60000 (76%)]  loss=0.1212
Train Epoch: 1 [46080/60000 (77%)]  loss=0.0779
Train Epoch: 1 [46720/60000 (78%)]  loss=0.1932
Train Epoch: 1 [47360/60000 (79%)]  loss=0.0708
Train Epoch: 1 [48000/60000 (80%)]  loss=0.2091
Train Epoch: 1 [48640/60000 (81%)]  loss=0.1387
Train Epoch: 1 [49280/60000 (82%)]  loss=0.0939
Train Epoch: 1 [49920/60000 (83%)]  loss=0.1070
Train Epoch: 1 [50560/60000 (84%)]  loss=0.1201
Train Epoch: 1 [51200/60000 (85%)]  loss=0.1448
Train Epoch: 1 [51840/60000 (86%)]  loss=0.0669
Train Epoch: 1 [52480/60000 (87%)]  loss=0.0241
Train Epoch: 1 [53120/60000 (88%)]  loss=0.2617
Train Epoch: 1 [53760/60000 (90%)]  loss=0.0919
Train Epoch: 1 [54400/60000 (91%)]  loss=0.1290
Train Epoch: 1 [55040/60000 (92%)]  loss=0.1901
Train Epoch: 1 [55680/60000 (93%)]  loss=0.0342
Train Epoch: 1 [56320/60000 (94%)]  loss=0.0362
Train Epoch: 1 [56960/60000 (95%)]  loss=0.0762
Train Epoch: 1 [57600/60000 (96%)]  loss=0.1174
Train Epoch: 1 [58240/60000 (97%)]  loss=0.1931
Train Epoch: 1 [58880/60000 (98%)]  loss=0.2089
Train Epoch: 1 [59520/60000 (99%)]  loss=0.0628

accuracy=0.9667
pytorchjob_client.delete('pytorch-dist-mnist-gloo')
{'kind': 'Status',
 'apiVersion': 'v1',
 'metadata': {},
 'status': 'Success',
 'details': {'name': 'pytorch-dist-mnist-gloo',
  'group': 'kubeflow.org',
  'kind': 'pytorchjobs',
  'uid': 'cd6da473-042b-446f-b34c-932cc7cf318a'}}
sungsoo commented 2 years ago

Java에서 Python 사용하기


현실적인 방안

자바에서 파이썬을 사용하는 현실적인 방안은 두 가지로 볼 수 있다.

데몬을 만들어서 서비스 하는 방식

파이썬으로 서버를 만들어서 자바에서 서비스를 호출하는 방식인데 운영환경에서 너무 많은 서버를 두어야 해서 일단 이 방식은 사용하지 않기로 한다.

프로세스를 사용하는 방식

자바에서 파이썬 프로그램을 호출하는 방식인데 java.lang.Runtime이나 java.lang.ProcessBuilder를 사용하면 시스템의 프로그램을 실행하고 그 결과를 받아 올 수 있다. 보다 자세한 설명은 https://www.baeldung.com/java-lang-processbuilder-api 을 참고한다.

즉, 자바에서 파이썬 프로그램을 실행하고 파이썬이 표준 출력(sdtout)에 쓴 내용을 문자열로 캡처하여 처리하는 방식으로 개발한다. 예를들어, 이미지를 텍스트로 변환하는 프로그램을 파이썬으로 작성한다.

  1. 자바에서 그 파이썬 프로그램을 실행한다.
  2. 파이썬 프로그램은 이미지에서 추출한 문자열을 표준 출력에 쓴다.
  3. 자바에서 그 출력을 문자열에 저장하여 사용한다.

ProcessBuilder의 기본 사용방법

ProcessBuilder를 생성할 때 명령어와 인자를 전달한다. 인스턴스의 start() 메소드를 실행하고 start 메소드가 반환하는 Process 인스턴스의 watiFor() 메소드를 사용하여 서브 프로세스가 끝날 때까지 기다린다. waitFor() 메소드는 정수값을 반환하는데 보통 0이면 정상이다.

String command = "python";  // 명령어
String arg1 = "mnist.py"; // 인자
ProcessBuilder builder = new ProcessBuilder(command, arg1);
Process process = builder.start();
int exitVal = process.waitFor();  // 자식 프로세스가 종료될 때까지 기다림
if(exitVal != 0) {
  // 비정상 종료
}

위 코드에서는 python.exe를 명령어로 전달하고 실행할 파이썬 프로그램을 인자로 전달했다. 파이썬이 파이썬 프로그램을 실행할 것이다.

자식 프로세스가 표준출력(System.out)으로 출력하는 것을 가져오려면 process.getInputStream()을 사용한다.

InputStream input = process.getInputStream(); // 자식 프로세스가 System.out에 출력하는 내용 

자식 프로세스에게 입력을 전달하려면 process.getOutputStream()을 사용한다.

OutputStream output = process.getOutputStream(); //자식 프로세스에 입력값 전달

파이썬 파일을 실행하여 파이썬 파일에서 System.out으로 출력하는 내용을 가져와서 출력하는 완전한 코드는 다음과 같다.

String command = "C:\\Anaconda3\\envs\\jep\\python.exe";  // 명령어
String arg1 = "F:\\src\\hyeon\\latteonterrace\\python\\python-exe\\src\\python\\test.py"; // 인자
ProcessBuilder builder = new ProcessBuilder(command, arg1);
Process process = builder.start();
int exitVal = process.waitFor();  // 자식 프로세스가 종료될 때까지 기다림
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream(), "euc-kr")); // 서브 프로세스가 출력하는 내용을 받기 위해
String line;
while ((line = br.readLine()) != null) {
     System.out.println(">>>  " + line); // 표준출력에 쓴다
}
if(exitVal != 0) {
  // 비정상 종료
  System.out.println("서브 프로세스가 비정상 종료되었다.");
}

InputStreamReader를 사용할 때 encoding 옵션을 주는데, 실행되는 프로그램이 표준 출력에 출력하는 encoding에 따라 한글이 깨질 수 있다. 적절한 encoding 옵션을 주어야 한다.

파이썬에서 파일 쓰기를 하거나 크롤링을 할 때 한글이 깨지는 문제가 있다. 파이썬이 기본 UTF-8이 아니라서 비 영어권 사용자들은 코딩할 때 별도의 옵션을 제공해야 한다. 인코딩 타입을 지정하면 해결할 수 있다.

file=open(fileName, 'w', encoding='utf-8')

크롤링하다가 다음과 같은 에러를 만날 수도 있다.

UnicodeEncodeError: 'cp949' codec can't encode character ...

코드 상단에 다음과 같이 코드를 추가하면 해결할 수 있다.

import sys 
import io 
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding="utf-8")
sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding="utf-8")

redirectErrorStream(true)

ProcessBuilder를 사용하여 서브 프로그램을 실행할 때 어떤 오류가 발생했는지 알 수 없게 된다. redirectErrorStream(true)를 사용하여 표준 에러 출력을 표준 출력으로 쓸 수 있게 할 수 있다.

ProcessBuilder builder = new ProcessBuilder(pythonExe, pyFilePath + "/test.py");
builder.redirectErrorStream(true);  // 표준 에러도 표준 출력에 쓴다
process = builder.start();
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream(), "utf-8"));

이제 서브 프로세스에서 오류가 발생하는 경우 inputStream()을 통해 읽을 수 있다. waitFor()를 사용하여 inputStream을 통해 읽은 내용일 오류인지 정상적인 출력인지 알 수 있다.

int exitVal = process.waitFor();
if(exitVal != 0)  {
   System.out.println("비정상 종료");
}

표준 입력과 출력을 Redirecting하기

표준입력과 출력을 파일에 쓰기 원할 수 있다. redirectOutput 메소드를 사용하여 파일과 같은 다른 소스에 쓸 수 있다. 이 경우에 getOutputStream()은 ProcessBuilder.NullOutputStream을 반환한다.

ProcessBuilder processBuilder = new ProcessBuilder("java", "-version");

processBuilder.redirectErrorStream(true);
File log = folder.newFile("java-version.log");
processBuilder.redirectOutput(log);

Process process = processBuilder.start();

표준 출력과 표준 에러를 각각의 파일로 저장할 수 잇다.

File outputLog = tempFolder.newFile("standard-output.log");
File errorLog = tempFolder.newFile("error.log");

processBuilder.redirectOutput(Redirect.appendTo(outputLog));
processBuilder.redirectError(Redirect.appendTo(errorLog));

현재 프로세스의 I/O를 상속하기

redirectOutput() 메서드 등으로 Redirect.INHERIT를 지정하면 부모 프로세스에서 바로 System.out으로 출력한 것처럼 하위 프로세스의 출력이 연결된다. System.out으로 따로 스트림을 복사하지 않아도 콘솔에서 결과가 보인다.

이렇게 Redirect.INHERIT로 지정하면 Process.getInputStream() 메서드의 실행 결과는 java.lang.ProcessBuilder$NullInputStream 클래스가 되며 실제 출력 내용을 스트림으로 전달하지 않는다. redirectOutput() 메서드로 별다른 값을 지정하지 않았을 때의 기본값은 Redirect.PIPE이며 파이프를 통해 부모 프로세스로 출력 결과를 전달한다. 예제 1에서 한 것처럼 기본값일 때는 직접 Process.getInputStream() 메서드로 얻어온 스트림을 다루어야 한다. redirectOutput(File) 메서드로 직접 스트림을 출력할 파일을 지정할 수도 있다.

@Test
public void givenProcessBuilder_whenInheritIO_thenSuccess() throws IOException, InterruptedException {
    ProcessBuilder processBuilder = new ProcessBuilder("/bin/sh", "-c", "echo hello");

    processBuilder.inheritIO();
    Process process = processBuilder.start();

    int exitCode = process.waitFor();
    assertEquals("No errors should be detected", 0, exitCode);
}

기타 고려할 사항

Process를 사용할 때 몇가지 고려할 사항을 알아보자.