project-codeflare / codeflare-sdk

An intuitive, easy-to-use python interface for batch resource requesting, access, job submission, and observation. Simplifying the developer's life while enabling access to high-performance compute resources, either in the cloud or on-prem.
Apache License 2.0
22 stars 41 forks source link

Running KFP with Codeflare SDK #131

Open blublinsky opened 1 year ago

blublinsky commented 1 year ago

We finally made it work, but I do not think it is sustainable for the wider population. Here is what we have to do:

  1. Because Codeflare SDK relies on OC, we had to create a new image for KFP execution
    FROM python:3.8.16-slim-bullseye

RUN apt update && apt install -y wget

install oc

RUN mkdir /opt/oc RUN wget -O /opt/oc/release.tar.gz RUN tar -xzvf /opt/oc/release.tar.gz -C /opt/oc/ && \ mv /opt/oc/oc /usr/bin/ && \ rm -rf /opt/oc

install libraries

RUN pip install --upgrade pip && pip install codeflare-sdk RUN pip install "ray[default]"==2.1.0

Allow writes for yaml files

RUN chmod -R 777 /tmp

Note here that we also had to create a writable directory for saving intermediate YAML
2. Because Codeflare SDK directly manipulates MCAD, RAYCluster, and OpenShift Routes resources, we had to add additional permission to `pipeline-runner` service account, which should eventually go to KFDef. Here are the files:

apiVersion: kind: Role metadata: name: kfp-openshift-route rules:

execute ray pipeline

def execure_ray_pipeline(token: str, # token to authenticate to cluster name: str, # name of Ray cluster min_worker: str, # min number of workers max_worker: str, # max number of workers min_cpus: str, # min cpus per worker max_cpus: str, # max cpus per worker min_memory: str, # min memory per worker max_memory: str, # max memory per worker image: str = "" ):

Ray code - basically hello world

import ray
class Counter:
    def __init__(self):
        self.counter = 0

    def inc(self):
        self.counter += 1

    def get_counter(self):
        return self.counter

# Import pieces from codeflare-sdk
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration, list_all_clusters, list_all_queued
from codeflare_sdk.cluster.auth import TokenAuthentication
import os

# get current namespace
ns = os.getenv('NAMESPACE', 'default')
# change the current directory to ensure that we can write

print(f"Executing in namespace {ns}, current working directory is {os.getcwd()}")

# Create authentication object for oc user permissions
auth = TokenAuthentication(
except Exception as e:
    print(f"Failed to log into openshift cluster, error {e}. Please check token/server values provided")
print("successfully logged in")
# Create and configure our cluster object (and appwrapper)
cluster = Cluster(ClusterConfiguration(
    name = name,
    namespace = ns,
    min_worker = int(min_worker),
    max_worker = int(max_worker),
    min_cpus = int(min_cpus),
    max_cpus = int(max_cpus),
    min_memory = int(min_memory),
    max_memory = int(max_memory),
    image = image,
print(f"Configuration for Ray cluster {name} in namespace {ns} is created")

# bring up the cluster
print(f"Creating Ray cluster {name} in namespace {ns}...")

# and wait for it being up
rc = cluster.details(print_to_console=False)
print("Ray cluster is ready")

# Get cluster connection points
ray_cluster_uri = cluster.cluster_uri()
print(f"Ray_cluster is at {ray_cluster_uri}")

# Connect to the cluster
    ray.init(address=f'{ray_cluster_uri}', ignore_reinit_error=True)
except Exception as e:
    print(f"Failed to connect to Ray cluster, error {e}")
print("connected to Ray cluster")

# execute Ray function
print("Running Hello world")
counter = Counter.remote()

for _ in range(5):

# delete cluster
print("All done. Cleaning up")


ray_pipiline_op = comp.func_to_container_op( func=execure_ray_pipeline, base_image="blublinsky1/kfp-oc:0.0.2" )

Pipeline to invoke execution on remote resource

@dsl.pipeline( name='simple-ray-pipeline', description='Pipeline to show how to use codeflare sdk to create Ray cluster and run jobs' ) def simple_ray_pipeline(token: str, # token to authenticate to cluster name: str = "kfp-ray", # name of Ray cluster min_worker: str = "2", # min number of workers max_worker: str = "2", # max number of workers min_cpus: str = "2", # min cpus per worker max_cpus: str = "2", # max cpus per worker min_memory: str = "4", # min memory per worker max_memory: str = "4", # max memory per worker image: str = "" ):

# invoke pipeline
pipeline = ray_pipiline_op(token, name, min_worker, max_worker, min_cpus,max_cpus, min_memory,
                           max_memory, image)
pipeline.execution_options.caching_strategy.max_cache_staleness = "P0D"

if name == 'main':

Compiling the pipeline

TektonCompiler().compile(simple_ray_pipeline, __file__.replace('.py', '.yaml'))

After all this, the workflow works correctly.
Need to also add an implementation based on
yuanchi2807 commented 1 year ago

Since a component is already running in the cluster, do we still need TokenAuthentication? Can one invoke load_incluster_config instead?

from kubernetes import client, config
loadedconf = config.load_incluster_config()

If TokenAuthentication is required, how does one update a token in a scheduled operational environment? Does it need to be compiled in also?

# Create authentication object for oc user permissions
    auth = TokenAuthentication(
roytman commented 1 year ago

The following works:

# Create authentication object for oc user permissions
    with open("/var/run/secrets/", "r") as file:
        token =
    auth = TokenAuthentication(token=token, server="https://kubernetes.default:443", skip_tls=True)