kubeflow / pipelines

Machine Learning Pipelines for Kubeflow
https://www.kubeflow.org/docs/components/pipelines/
Apache License 2.0
3.61k stars 1.63k forks source link

Unable to mount VolumeOp volumes inside recursive graph components #1891

Closed logicbomb421 closed 3 years ago

logicbomb421 commented 5 years ago

Small Backstory: I am attempting to design a pipeline that processes a large dataset in batches. This dataset is processed in batches via the batch_size pipeline parameter. Once the data has been batched, I am attempting to use a graph_component component to essentially loop over all batches and spawn processing nodes.

Each of these processing nodes needs to have the working directory created by the initial VolumeOp mounted to it.

What happened: When attempting to mount the VolumeOp-based volume into ContainerOps recursively spawned nodes, I receive the error: This step is in Error state with this message: volume '<pvc_that_was_just_created_by_volumeop>' not found in workflow spec

What did you expect to happen: The VolumeOp-based volume mounts to all ContainerOp containers, regardless of if they were spawned recursively or not.

What steps did you take: I have tried various ways to make this work, from simply calling add_pvolumes on the ContainerOps spawned recursively, to manually creating the k8s resources and appending them (.add_volume, .container.add_volume_mount), all the way to manually editing the generated YAML based on what I was reading regarding looping in the argo docs.

Anything else you would like to add: Here is a self-contained example that should reproduce this (attempted to attach to issue but GH was choking on the upload)

Instructions

1. Build a base image using the included Dockerfile
2. Upload this base image to Dockerhub
3. Replace the string `BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE` in `pipeline.py` with this newly created image+tag
4. Replace the string `CHANGE_ME_TO_A_GCS_BUCKET` with an existing GCS bucket you have access to
5. Ensure the file `500.json` exists in the bucket (included with example).
6. Compile and upload the pipeline
7. Run the pipeline

Dockerfile

FROM python:3.7-alpine
RUN pip install \
  requests \
  google-cloud-storage

Oneliner for 500.json

python -c 'import json; fd = open("500.json", "w"); json.dump([1] * 500, fd); fd.close()'

pipeline.py

import kfp.dsl as dsl
import kfp.components as components
import kubernetes.client as k8s
from collections import namedtuple
import inspect

def data_load(bucket: str, key: str, work_dir: str) -> str:
  from google.cloud import storage
  from os.path import basename, join
  c = storage.Client()
  bucket = c.get_bucket(bucket)
  blob = bucket.get_blob(key)
  if blob:
    filename = join(work_dir, basename(key))
    blob.download_to_filename(filename)
    return filename
data_load_op = components.func_to_container_op(data_load, base_image='BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE')

def data_split(batch_size: int, data_filename: str, work_dir: str) -> str:
  print(f'data_split(batch_size={batch_size}, data_filename={data_filename})')
  def chunk(l: list, c: int):
    print(f'  -> chunk(l=[{len(l)}], c={c})')
    for i in range(0, len(l), c):
      print(f'    -> yield [{len(l[i:i+c])}]')
      yield l[i:i+c]
  import json, os, uuid
  with open(data_filename, 'r') as fd:
    all_data = json.load(fd)
    print(f'all data json contained {len(all_data)} records')
  chunks = list(chunk(all_data, batch_size))
  chunk_files = []
  for i, chnk in enumerate(chunks):
    chunk_file = os.path.join(work_dir, f'{i}.chunk')
    chunk_files.append(chunk_file)
    with open(chunk_file, 'w') as fd:
      json.dump(chnk, fd)
  return json.dumps(chunk_files)
data_split_op = components.func_to_container_op(data_split, base_image='BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE')

Next = namedtuple('result', 'index batch')
def next_batch(index: int, batches: str) -> Next:
  print(f'next_batch(index={index}, batches={batches}')
  import json
  batches_ = json.loads(batches)
  if index <= len(batches_) - 1:
    nxt = batches_[index]
    print(f'  -> return Next(index={index+1}, batch={nxt})')
    return Next(index=index+1, batch=nxt)
  print(f'-> return Next(index=-1, batch=None)')
  return Next(index=-1, batch=None)
next_op = components.func_to_container_op(
  next_batch, 
  base_image='python:3.7-alpine',
  extra_code="from collections import namedtuple; Next = namedtuple('result', 'index batch')"
)

def process_batch(batch: str) -> None:
  import json
  with open(batch, 'r') as fd:
    batch_data = json.load(fd)
  print(f'loaded {len(batch_data)} records in batch')
  import time
  import random
  sleep_secs = random.randrange(0, 100)
  print(f'sleeping for {sleep_secs} seconds')
  time.sleep(sleep_secs)
process_op = components.func_to_container_op(process_batch, base_image='BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE')

def print_message(message: str) -> None:
  print(message or 'NO MESSAGE SUPPLIED!')
print_op = components.func_to_container_op(print_message, base_image='python:3.7-alpine')

@dsl.graph_component
def looper(batch, batches, index, work_vol_name, work_dir):
  import kfp.gcp as gcp
  p1 = process_op(batch).apply(gcp.use_gcp_secret('user-gcp-sa'))
  p1.add_pvolumes({work_dir: k8s.V1Volume(name=work_vol_name)})
  next_ = next_op(index, batches)
  with dsl.Condition(next_.outputs['index'] != '-1'):
    looper(next_.outputs['batch'], batches, next_.outputs['index'], work_vol_name, work_dir)

@dsl.pipeline(name='Self-Contained Example')
def pipeline_func(bucket="CHANGE_ME_TO_A_GCS_BUCKET",
                  key="500.json",
                  batch_size=250,
                  work_dir='/var/pipeline'):
  #################################################
  # meta init   
  import time
  import kfp.gcp as gcp
  from os import path
  from uuid import uuid4

  WORK_DIR = work_dir
  WORK_VOLUME_NAME = 'pipeline-workdir'
  workdirop = dsl.VolumeOp(
    name='Create Scratch Volume',
    resource_name=WORK_VOLUME_NAME,
    size='1Gi',
    modes=dsl.VOLUME_MODE_RWM,
    storage_class='nfs'
  )

  load_op = data_load_op(
    bucket=bucket,
    key=key,
    work_dir=WORK_DIR
  ).apply(gcp.use_gcp_secret('user-gcp-sa')).add_pvolumes({WORK_DIR: workdirop.volume}).after(workdirop)
  load_op.container.set_image_pull_policy('Always')

  split_op = data_split_op(
    batch_size=batch_size,
    work_dir=WORK_DIR,
    data_filename=load_op.output
  ).apply(gcp.use_gcp_secret('user-gcp-sa')).add_pvolumes({WORK_DIR: workdirop.volume}).after(load_op)

  first_op = next_op(
    index=0,
    batches=split_op.output
  )

  loop_op = looper(
    batch=first_op.outputs['batch'], 
    batches=split_op.output, 
    index=first_op.outputs['index'],
    work_vol_name=workdirop.outputs['name'],
    work_dir=WORK_DIR
  ).after(workdirop).after(load_op).after(split_op).after(first_op)

  complete_op = print_op(message='DONE').after(loop_op)
paveldournov commented 5 years ago

/cc @kevinbache to investigate

elikatsis commented 5 years ago

Hello @logicbomb421 and sorry for the late response.

I compiled the python code you provided (pipeline.py) and I noticed the following:

  - container:
    ...
    volumeMounts:
    - mountPath: /secret/gcp-credentials
      name: gcp-credentials-user-gcp-sa
    - mountPath: '{{inputs.parameters.work-dir}}'
      name: '{{inputs.parameters.create-scratch-volume-name}}'
    ...
    name: process-batch
    ...

The problem is that {{inputs.parameters.create-scratch-volume-name}} gets resolved to the PVC name. But it is not this which should be used. Instead, it is the volume name as referenced in the workflow spec (name field of volumes list) that must be used. The name of the PVC resource means nothing to the workflow. It functions the same way StatefulSets do. volumes is a list mapping cluster volume resources to keys (names), and to be able to use those resources we need to reference them through those keys.

I notice that data-split template has it referenced the correct way.

  - container:
    ...
    volumeMounts:
    - mountPath: /secret/gcp-credentials
      name: gcp-credentials-user-gcp-sa
    - mountPath: '{{inputs.parameters.work-dir}}'
      name: create-scratch-volume
    ...
    name: data-split
    ...

So let's debug pipeline.py. In looper() function you seem to use the work_vol_name as volume name.

p1.add_pvolumes({work_dir: k8s.V1Volume(name=work_vol_name)})

When you call that function you pass workdirop.outputs['name'] to work_vol_name, which is an output parameter - the name of the created PVC, shouldn't be used as mentioned earlier.

You should mount the workdirop.volume, the same way you do for data-split:

split_op = data_split_op(   
    batch_size=batch_size,    
    work_dir=WORK_DIR,
    data_filename=load_op.output
).apply(gcp.use_gcp_secret('user-gcp-sa')).add_pvolumes({WORK_DIR: workdirop.volume}).after(load_op)

So, I suggest to pass workdirop.volume to looper() function, too. (And maybe use work_vol argument instead of work_vol_name). Another way would be to pass workdirop.name, which essentially is the volume name used, but I wouldn't recommend it since that naming decision could change in the future.


Trying that, though, I get the following error:

ValueError: arguments to looper should be PipelineParams.

But I believe it isn't the desired behavior.

I get that a dsl.Condition probably needs to contain PipelineParams, but I think that arguments to a graph component could be other types as well.

@Ark-kun, Hey Alexey, any thoughts on the last part?

logicbomb421 commented 5 years ago

@elikatsis thank you for the reply! I actually stated building this by passing workdirop.volume to looper but ran into the same error you encountered as well. I switched to passing the names since I could be sure they would be PipelineParams since they were the output of another operation.

Ideally however, passing the actual k8s resource created to looper would be great.

Ark-kun commented 5 years ago

Trying that, though, I get the following error:

ValueError: arguments to looper should be PipelineParams.

But I believe it isn't the desired behavior.

I get that a dsl.Condition probably needs to contain PipelineParams, but I think that arguments to a graph component could be other types as well.

@Ark-kun, Hey Alexey, any thoughts on the last part?

I agree with you. I'll need to check the implementation of @graph_component some day. @gaoning777 Do you remember why this limitation exist?

davidyuyuan commented 4 years ago

I am getting the same error "invalid spec: templates.1000g-variant.tasks.for-loop-for-loop-7301aa19-1 failed to resolve {{inputs.parameters.ds-human-g1k-v37-name}}" in dsl.ParallelFor as well. By applying the workaround mentioned above, I was able to start a run. Here is the code segement with workaround:

with dsl.ParallelFor(readqueries()) as query:
    with dsl.ParallelFor(REGIONS) as region:
        # freebayes_task = \
        freebayes_op(region=region,
                     query=query,
                     # https://github.com/kubeflow/pipelines/issues/1891
                     # workaround as .volume passes in pvc name incorrectly in for loops
                     # input_vol=human_g1k_v37.volume.after(samtools_task),
                     input_vol=k8s_client.models.V1Volume(name=human_g1k_v37.name),
                     # output_vol=vcf_output.volume,
                     output_vol=k8s_client.models.V1Volume(name=vcf_output.name),
                     host=oneclient_provider_host,
                     token=oneclient_access_token,
                     # .after() added to work around issue 1891
                     insecure=oneclient_insecure).after(samtools_task, vcf_output, human_g1k_v37)

The problem does not exist if outside of dsl.ParallelFor.

gaoning777 commented 4 years ago

/assign @Ark-kun

gaoning777 commented 4 years ago

/assign @numerology

gaoning777 commented 4 years ago

/unassign @gaoning777

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 4 years ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

fondlykip commented 4 years ago

ValueError: arguments to looper should be PipelineParams.

Was a fix implemented for this Issue? I am also facing this error when passing a pvolume to a @graph_component.

Bobgy commented 4 years ago

/reopen

k8s-ci-robot commented 4 years ago

@Bobgy: Reopened this issue.

In response to [this](https://github.com/kubeflow/pipelines/issues/1891#issuecomment-715388138): >/reopen Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes/test-infra](https://github.com/kubernetes/test-infra/issues/new?title=Prow%20issue:) repository.
fondlykip commented 4 years ago

It seems to me that we would just need to include PipelineVolume in dsl._component.py here

like:

from ._pipeline_volume import PipelineVolume

...

if not isinstance(input, (PipelineParam, PipelineVolume)):
        raise ValueError('arguments to ' + func.__name__ + ' should be PipelineParams or PipelineVolumes.')
stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 3 years ago

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.