kubeflow-incubator / tfjob-java-client

Java Client for Kubeflow TFJob
Apache License 2.0
3 stars 6 forks source link

Implement a user-friendly wrapper for client #4

Closed jetmuffin closed 5 years ago

jetmuffin commented 5 years ago

In generated codes, the client is too complicated to use because jobs are defined hierarchically, here is an example:

    V1alpha2TFJob job =
        new V1alpha2TFJob()
            .apiVersion("kubeflow.org/v1alpha2")
            .kind("TFJob")
            .metadata(new V1ObjectMeta().name("test"));
    V1alpha2TFJobSpec spec = new V1alpha2TFJobSpec().cleanPodPolicy("Running");

    Map<String, V1alpha2TFReplicaSpec> replicas = new HashMap<>();

    // PS Spec
    V1alpha2TFReplicaSpec psSpec =
        new V1alpha2TFReplicaSpec().replicas(2).restartPolicy("OnFailure");

    V1PodTemplateSpec psTemplateSpec = new V1PodTemplateSpec();
    V1PodSpec psPodSpec = new V1PodSpec();

    V1Container psContainer =
        new V1Container()
            .name("tensorflow")
            .image("registry.cn-hangzhou.aliyuncs.com/jetmuffin/tensorflow:1.7.0-hdfs");
    psContainer.setCommand(Arrays.asList("kfrun", "/app/dist-mnist.py"));
    psContainer.addVolumeMountsItem(new V1VolumeMount().mountPath("/app").name("code1"));
    psContainer.addVolumeMountsItem(
        new V1VolumeMount().mountPath("/tmp/mnist-data").name("volume1"));
    psPodSpec.addContainersItem(psContainer);

    psPodSpec.addVolumesItem(
        new V1Volume()
            .name("code1")
            .hostPath(new V1HostPathVolumeSource().path("/tmp/mnist_code")));
    psPodSpec.addVolumesItem(
        new V1Volume()
            .name("volume1")
            .hostPath(new V1HostPathVolumeSource().path("/tmp/mnist_data")));

    psTemplateSpec.setSpec(psPodSpec);
    psSpec.setTemplate(psTemplateSpec);

    // Worker Spec
    V1alpha2TFReplicaSpec workerSpec =
        new V1alpha2TFReplicaSpec().replicas(2).restartPolicy("OnFailure");
    V1PodTemplateSpec workerTemplateSpec = new V1PodTemplateSpec();
    V1PodSpec workerPodSpec = new V1PodSpec();

    V1Container workerContainer =
        new V1Container()
            .name("tensorflow")
            .image("registry.cn-hangzhou.aliyuncs.com/jetmuffin/tensorflow:1.7.0-hdfs");
    workerContainer.setCommand(Arrays.asList("kfrun", "/app/dist-mnist.py"));
    workerContainer.addVolumeMountsItem(new V1VolumeMount().mountPath("/app").name("code2"));
    workerContainer.addVolumeMountsItem(
        new V1VolumeMount().mountPath("/tmp/mnist-data").name("volume2"));
    workerPodSpec.addContainersItem(workerContainer);

    workerPodSpec.addVolumesItem(
        new V1Volume()
            .name("code2")
            .hostPath(new V1HostPathVolumeSource().path("/tmp/mnist_code")));
    workerPodSpec.addVolumesItem(
        new V1Volume()
            .name("volume2")
            .hostPath(new V1HostPathVolumeSource().path("/tmp/mnist_data")));

    workerTemplateSpec.setSpec(workerPodSpec);
    workerSpec.setTemplate(psTemplateSpec);

    replicas.put("PS", psSpec);
    replicas.put("worker", workerSpec);

    spec.setTfReplicaSpecs(replicas);
    job.setSpec(spec);

And in this new client, jobs are defined in fluent style as follows:

            KubeflowClient client = KubeflowClientFactory.newInstanceFromConfig("/home/mofeng.cj/kubeconfig");

            TFReplica ps = new TFReplica().replicas(1)
                                          .cpu(1.0)
                                          .memory(1024.0)
                                          .image("registry.cn-hangzhou.aliyuncs.com/jetmuffin/word2vec_kubeflow")
                                          .command("python entry.py");
            TFReplica worker = new TFReplica().replicas(1)
                                              .cpu(2.0)
                                              .memory(2048.0)
                                              .image("registry.cn-hangzhou.aliyuncs.com/jetmuffin/word2vec_kubeflow")
                                              .command("python entry.py");
            Job job = new Job().name("test")
                               .ps(ps)
                               .worker(worker)
                               .cleanupPolicy("running");
            client.submitJob(job);