IBM / data-prep-kit

Open source project for data preparation of LLM application builders
Apache License 2.0
248 stars 122 forks source link

[Bug] Cannot connect to an external Ray Cluster #373

Open kb-0311 opened 4 months ago

kb-0311 commented 4 months ago

Search before asking



What happened + What you expected to happen

  1. The Bug:

I am trying to execute a transform using ray runtime on an external ray cluster without kfp and kind.

I have a ray cluster running in my docker container locally and I have been trying to submit the sample ray local tokenization example job to the ray cluster by changing the transform_launcher config :

if self.run_locally:
                # Will create a local Ray cluster
                logger.debug("running locally creating Ray cluster")
                # enable metrics for local Ray
                ray.init(f"ray://[::1]:10001" , ignore_reinit_error=True )

for context: ray://[::1]:10001 is my ray client address of the ray cluster running in my container.

Then modified the local execution flag in the make run-cli-sample to --run-locally False. And I got this error:

03:17:52 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
03:17:52 INFO - actor creation delay 0
03:17:52 INFO - job details {'job category': 'preprocessing', 'job name': 'Tokenization', 'job type': 'ray', 'job id': 'job_id'}
03:18:23 INFO - Exception running ray remote orchestration
Initialization failure from server:
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/util/client/server/", line 704, in Datapath
    if not self.proxy_manager.start_specific_server(
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/util/client/server/", line 299, in start_specific_server
    runtime_env_config = job_config._get_proto_runtime_env_config()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/", line 224, in _get_proto_runtime_env_config
    return self._get_proto_job_config().runtime_env_info.runtime_env_config
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/", line 209, in _get_proto_job_config
    if self.py_logging_config:
AttributeError: 'JobConfig' object has no attribute 'py_logging_config'

03:18:23 INFO - Completed execution in 0.5071521480878194 min, execution result 1

and running the make run-cli-sample in the ray/ directory

  1. Expected Behaviour: The transform should be executed in the remote ray cluster and process the input files and return the output files.

  2. Additional useful information: Running a ray cluster locally (outside docker) , then modifying --run-locally False in the make command and executing make run-cli-sample gives the same error.

However running make run-cli-sample without making any changes to the repository starts a local ray cluster and executed the transform just fine.

Reproduction script

start a ray cluster outside of dpk using

ray start --head

o to the data-prep-kit/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ , change the ray client url for remote connection in the else block, to the ray-client-url of your local cluster.

  def _submit_for_execution(self) -> int:
        Submit for Ray execution
        res = 1
        start = time.time()
            if self.run_locally:
                # Will create a local Ray cluster
                logger.debug("running locally creating Ray cluster")
                # enable metrics for local Ray
                # connect to the existing cluster
      "Connecting to the existing Ray cluster")
                ray.init(f"ray://[::1]:10001", ignore_reinit_error=True)
            logger.debug("Starting orchestrator")
            res = ray.get(
            logger.debug("Completed orchestrator")
        except Exception as e:
  "Exception running ray remote orchestration\n{e}")
  "Completed execution in {(time.time() - start)/60.} min, execution result {res}")
            return res

go to data-prep-kit/transforms/universal/tokenization/ray and go to the Makefile in that directory

change the run-cli-sample command by changing the run locally flag :

                RUN_ARGS="--run_locally False --data_local_config \"{ 'input_folder' : '../test-data/ds01/input', 'output_folder' : '../output'}\"  \
make venv

. venv/bin/activate

make run-cli-sample

Anything else



Red Hat Enterprise Linux (RHEL)



Are you willing to submit a PR?

kb-0311 commented 4 months ago

@daw3rd Created an issue and writing a few follow up experiments I performed which I thought are very relevant to this problem.

kb-0311 commented 4 months ago
  1. Start a ray cluster in the virtual env by activating the venv in the tokenization/ray directory and running ray start --head , then using the --run-locally False flag in the make command and then running the make run-cli-sample . And that works! I am able to connect to the ray cluster remotely. However there is this problem of handling input file paths and I get a new error :

    (venv) [kanishka@ml-pipelines ray]$ make run-cli-sample
    make \
                RUN_ARGS="--run_locally False --data_local_config \"{ 'input_folder' : '../test-data/ds01/input', 'output_folder' : '../output'}\"  \
    make[1]: Entering directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'
    source venv/bin/activate;       \
    cd src;                         \
    python --run_locally False --data_local_config "{ 'input_folder' : '../test-data/ds01/input', 'output_folder' : '../output'}"                  
    None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
    23:08:59 INFO - Launching Tokenization transform
    23:08:59 INFO - connecting to existing cluster
    23:08:59 INFO - data factory data_ is using local data access: input_folder - ../test-data/ds01/input output_folder - ../output
    23:08:59 INFO - data factory data_ max_files -1, n_sample -1
    23:08:59 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
    23:08:59 INFO - pipeline id pipeline_id
    23:08:59 INFO - code location None
    23:08:59 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
    23:08:59 INFO - actor creation delay 0
    23:08:59 INFO - job details {'job category': 'preprocessing', 'job name': 'Tokenization', 'job type': 'ray', 'job id': 'job_id'}
    23:08:59 INFO - Connecting to the existing Ray cluster
    2024-07-02 23:08:59,421 INFO -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error
    SIGTERM handler is not set because current thread is not the main thread.
    (orchestrate pid=510039) None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
    (orchestrate pid=510039) 23:09:04 INFO - orchestrator started at 2024-07-02 23:09:04
    (orchestrate pid=510039) 23:09:04 ERROR - No input files to process - exiting
    23:09:14 INFO - Completed execution in 0.24884503682454426 min, execution result 0
    make[1]: Leaving directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'
  2. Start a ray cluster in the virtual env by activating the venv in the tokenization/ray directory and running ray start --head .Tried the same thing with the make run-s3-sample to see whether minio can solve my issue of file paths by changing the launcher params to "run_locally": False , but the same issue I faced, I was able to connect to that cluster but accessing the files was the issue:

    (venv) [kanishka@ml-pipelines ray]$ make run-s3-sample
    make .defaults.minio.verify-running
    make[1]: Entering directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'
    make[1]: Leaving directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'
    make[1]: Entering directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'
    source venv/bin/activate;       \
    cd src;                         \
    None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
    environ({'SHELL': '/bin/bash', 'COLORTERM': 'truecolor', 'HISTCONTROL': 'ignoredups', 'TERM_PROGRAM_VERSION': '1.90.2', 'HISTSIZE': '1000', 'HOSTNAME': '', 'MAKE_TERMOUT': '/dev/pts/3', 'HOMEBREW_PREFIX': '/home/linuxbrew/.linuxbrew', 'PWD': '/home/kanishka/work/data-prep-kit/transforms/universal/tokenization/ray/src', 'LOGNAME': 'kanishka', 'XDG_SESSION_TYPE': 'tty', 'MANPATH': '/home/linuxbrew/.linuxbrew/share/man:/home/linuxbrew/.linuxbrew/share/man::', 'MAKEOVERRIDES': '${-*-command-variables-*-}', 'VSCODE_GIT_ASKPASS_NODE': '/home/kanishka/.vscode-server/cli/servers/Stable-5437499feb04f7a586f677b155b039bc2b3669eb/server/node', 'MOTD_SHOWN': 'pam', 'HOME': '/home/kanishka', 'LANG': 'en_US.UTF-8', 'LS_COLORS': 'rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=01;37;41:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.webp=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=01;36:*.au=01;36:*.flac=01;36:*.m4a=01;36:*.mid=01;36:*.midi=01;36:*.mka=01;36:*.mp3=01;36:*.mpc=01;36:*.ogg=01;36:*.ra=01;36:*.wav=01;36:*.oga=01;36:*.opus=01;36:*.spx=01;36:*.xspf=01;36:', 'VIRTUAL_ENV': '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray/venv', 'SSL_CERT_DIR': '/etc/pki/tls/certs', 'RUN_FILE': '', 'GIT_ASKPASS': '/home/kanishka/.vscode-server/cli/servers/Stable-5437499feb04f7a586f677b155b039bc2b3669eb/server/extensions/git/dist/', 'SSH_CONNECTION': ' 55827 22', 'MFLAGS': '-w', 'INFOPATH': '/home/linuxbrew/.linuxbrew/share/info:/home/linuxbrew/.linuxbrew/share/info:', 'VSCODE_GIT_ASKPASS_EXTRA_ARGS': '', 'XDG_SESSION_CLASS': 'user', 'MAKEFLAGS': 'w --', 'SELINUX_ROLE_REQUESTED': '', 'TERM': 'xterm-256color', 'LESSOPEN': '||/usr/bin/ %s', 'USER': 'kanishka', 'MAKE_TERMERR': '/dev/pts/3', 'VSCODE_GIT_IPC_HANDLE': '/run/user/6000/vscode-git-b3d771e5f0.sock', 'HOMEBREW_CELLAR': '/home/linuxbrew/.linuxbrew/Cellar', 'SELINUX_USE_CURRENT_RANGE': '', 'SHLVL': '3', 'MAKELEVEL': '2', 'HOMEBREW_REPOSITORY': '/home/linuxbrew/.linuxbrew/Homebrew', 'XDG_SESSION_ID': '358', 'VIRTUAL_ENV_PROMPT': '(venv) ', 'XDG_RUNTIME_DIR': '/run/user/6000', 'SSL_CERT_FILE': '/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem', 'PS1': '(venv) ', 'SSH_CLIENT': ' 55827 22', 'PYENV_ROOT': '/home/kanishka/.pyenv', 'which_declare': 'declare -f', 'VSCODE_GIT_ASKPASS_MAIN': '/home/kanishka/.vscode-server/cli/servers/Stable-5437499feb04f7a586f677b155b039bc2b3669eb/server/extensions/git/dist/askpass-main.js', 'XDG_DATA_DIRS': '/home/kanishka/.local/share/flatpak/exports/share:/var/lib/flatpak/exports/share:/usr/local/share:/usr/share', 'BROWSER': '/home/kanishka/.vscode-server/cli/servers/Stable-5437499feb04f7a586f677b155b039bc2b3669eb/server/bin/helpers/', 'PATH': '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray/venv/bin:/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray/venv/bin:/home/kanishka/miniconda3/bin:/home/kanishka/.vscode-server/cli/servers/Stable-5437499feb04f7a586f677b155b039bc2b3669eb/server/bin/remote-cli:/home/kanishka/.pyenv/shims:/home/kanishka/.pyenv/bin:/home/linuxbrew/.linuxbrew/bin:/home/linuxbrew/.linuxbrew/sbin:/home/kanishka/miniconda3/bin:/home/kanishka/.pyenv/bin:/home/linuxbrew/.linuxbrew/bin:/home/linuxbrew/.linuxbrew/sbin:/home/kanishka/miniconda3/bin:/home/kanishka/.local/bin:/home/kanishka/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin', 'SELINUX_LEVEL_REQUESTED': '', 'DBUS_SESSION_BUS_ADDRESS': 'unix:path=/run/user/6000/bus', 'MAIL': '/var/spool/mail/kanishka', 'OLDPWD': '/home/kanishka/work/data-prep-kit/transforms/universal/tokenization/ray', 'TERM_PROGRAM': 'vscode', 'VSCODE_IPC_HOOK_CLI': '/run/user/6000/vscode-ipc-ef6d1917-4b0a-4eeb-a8a0-6d909519031b.sock', 'BASH_FUNC_which%%': '() {  ( alias;\n eval ${which_declare} ) | /usr/bin/which --tty-only --read-alias --read-functions --show-tilde --show-dot $@\n}', '_': '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray/venv/bin/python', 'RAY_CLIENT_MODE': '0'})
    23:16:17 INFO - connecting to existing cluster
    23:16:17 INFO - data factory data_ is using S3 data access: input path - test/tokenization/ds01/input, output path - test/tokenization/ds01/output
    23:16:17 INFO - data factory data_ max_files -1, n_sample -1
    23:16:17 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
    23:16:17 INFO - pipeline id pipeline_id
    23:16:17 INFO - code location {'github': 'github', 'commit_hash': '12345', 'path': 'path'}
    23:16:17 INFO - number of workers 3 worker options {'num_cpus': 0.8, 'max_restarts': -1}
    23:16:17 INFO - actor creation delay 0
    23:16:17 INFO - job details {'job category': 'preprocessing', 'job name': 'Tokenization', 'job type': 'ray', 'job id': 'job_id'}
    23:16:17 INFO - Connecting to the existing Ray cluster
    2024-07-02 23:16:17,296 INFO -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error
    SIGTERM handler is not set because current thread is not the main thread.
    (orchestrate pid=510035) None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
    (orchestrate pid=510035) 23:16:23 INFO - orchestrator started at 2024-07-02 23:16:23
    (orchestrate pid=510035) 23:16:23 ERROR - No input files to process - exiting
    23:16:33 INFO - Completed execution in 0.2762212514877319 min, execution result 0
    make[1]: Leaving directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'

You may want to stop the minio server now (see make help)

So I am guessing handling of local files is the issue here. Let me know if there are some potential work arounds for that I could try.

3. The last thing I tried was to deactivate the venv run a ray cluster locally on my machine outside of venv using the same version v2.24.0 of ray used in dpk. Did not get the JobConfig Error but i still was not able to connect to it. logs:

(venv) [kanishka@ml-pipelines ray]$ make run-cli-sample
make \
                RUN_ARGS="--run_locally False --data_local_config \"{ 'input_folder' : '../test-data/ds01/input', 'output_folder' : '../output'}\"  \
make[1]: Entering directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'
source venv/bin/activate;       \
cd src;                         \
python --run_locally False --data_local_config "{ 'input_folder' : '../test-data/ds01/input', 'output_folder' : '../output'}"                  
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
22:56:37 INFO - Launching Tokenization transform
22:56:37 INFO - connecting to existing cluster
22:56:37 INFO - data factory data_ is using local data access: input_folder - ../test-data/ds01/input output_folder - ../output
22:56:37 INFO - data factory data_ max_files -1, n_sample -1
22:56:37 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
22:56:37 INFO - pipeline id pipeline_id
22:56:37 INFO - code location None
22:56:37 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
22:56:37 INFO - actor creation delay 0
22:56:37 INFO - job details {'job category': 'preprocessing', 'job name': 'Tokenization', 'job type': 'ray', 'job id': 'job_id'}
22:56:37 INFO - Connecting to the existing Ray cluster
2024-07-02 22:56:37,488 INFO -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error
SIGTERM handler is not set because current thread is not the main thread.
Put failed:
22:56:41 INFO - Exception running ray remote orchestration
No module named 'data_processing_ray'
22:56:41 INFO - Completed execution in 0.06163370609283447 min, execution result 1
make[1]: *** [../../../../.make.defaults:374:] Error 1
make[1]: Leaving directory '/mnt/xvdc/work/data-prep-kit/transforms/universal/tokenization/ray'
make: *** [Makefile:43: run-cli-sample] Error 2
kb-0311 commented 4 months ago

So the current problems in dpk are that:

  1. It is not possible to connect to a remote ray cluster to execute some transform (a feature which is useful if there is a need to execute computationally large transform on a distributed env)
  2. Data passage between a local storage to a remote ray runtime is not handled well. (or maybe there are some config changes I am missing in which case feel free to correct me : ) )
blublinsky commented 3 months ago

So the current problems in dpk are that:

  1. It is not possible to connect to a remote ray cluster to execute some transform (a feature which is useful if there is a need to execute computationally large transform on a distributed env)
  2. Data passage between a local storage to a remote ray runtime is not handled well. (or maybe there are some config changes I am missing in which case feel free to correct me : ) )
  1. It is absolutely possible to connect to a remote Ray cluster and submit job - we are currently doing it through KFP using Ray remote job. We are not advertizing this feature externally, because it quite error prone, if not done correctly. It requires the same versions of both Ray and Python and a presence of the code that has to be executed on the Ray cluster. The reason to provide KFP wrapper is to shield the user from this details, which can be hard to debug.
  2. If you are using remote cluster there is no guarantee that you local data is accessible from it. So there was never support for this. If you are using remote cluster you should use externally accessible storage, for example S3
blublinsky commented 3 months ago

can we, please, close this