ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.5k stars 5.69k forks source link

[core] Ray session conflicts with PyArrow+HDFS #36415

Open krfricke opened 1 year ago

krfricke commented 1 year ago

What happened + What you expected to happen

Using PyArrow fs with HDFS works fine outside a ray session:

file_sys, file_path = pyarrow.fs.FileSystem.from_uri(hdfs_folder)
file_infos = file_sys.get_file_info(pyarrow.fs.FileSelector(file_path, recursive=False))

However, after ray.init(), the same code results in a segmentation fault:

2023-06-14 01:27:37,622 INFO worker.py:1614 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
*** SIGSEGV received at time=1686731258 on cpu 0 ***     
PC: @     0x7f99d20c5822  (unknown)  (unknown)   
    @     0x7f996fa6ec85        208  absl::lts_20220623::WriteFailureInfo() 
    @     0x7f996fa6e9c8         64  absl::lts_20220623::AbslFailureSignalHandler()                                    
    @     0x7f99e81c6420       3408  (unknown)      
    @     0x7f99d1c2782e         48  (unknown)
    @     0x7f99d1c2cc0f        240  (unknown)                                                                                                        
    @     0x7f99d2267a5f        144  (unknown)                                                                                                        
    @     0x7f99d2267d53        128  (unknown)
    @     0x7f99d21092a0         64  (unknown)
    @     0x7f99e81ba609  (unknown)  start_thread           
[2023-06-14 01:27:38,591 E 9716 9731] logging.cc:361: *** SIGSEGV received at time=1686731258 on cpu 0 ***
[2023-06-14 01:27:38,591 E 9716 9731] logging.cc:361: PC: @     0x7f99d20c5822  (unknown)  (unknown)
[2023-06-14 01:27:38,591 E 9716 9731] logging.cc:361:     @     0x7f996fa6ec85        208  absl::lts_20220623::WriteFailureInfo()
[2023-06-14 01:27:38,592 E 9716 9731] logging.cc:361:     @     0x7f996fa6e9e1         64  absl::lts_20220623::AbslFailureSignalHandler()
[2023-06-14 01:27:38,593 E 9716 9731] logging.cc:361:     @     0x7f99e81c6420       3408  (unknown)
[2023-06-14 01:27:38,593 E 9716 9731] logging.cc:361:     @     0x7f99d1c2782e         48  (unknown)
[2023-06-14 01:27:38,593 E 9716 9731] logging.cc:361:     @     0x7f99d1c2cc0f        240  (unknown)
[2023-06-14 01:27:38,593 E 9716 9731] logging.cc:361:     @     0x7f99d2267a5f        144  (unknown)
[2023-06-14 01:27:38,593 E 9716 9731] logging.cc:361:     @     0x7f99d2267d53        128  (unknown)      
[2023-06-14 01:27:38,593 E 9716 9731] logging.cc:361:     @     0x7f99d21092a0         64  (unknown)
[2023-06-14 01:27:38,593 E 9716 9731] logging.cc:361:     @     0x7f99e81ba609  (unknown)  start_thread                          
Fatal Python error: Segmentation fault                                                                                                                

#                                                                                                                                                     
# A fatal error has been detected by the Java Runtime Environment:                                                                                    
#                                                                                                                                                     
#  SIGSEGV (0xb) at pc=0x00007f99e81c62ab, pid=9716, tid=0x00007f99baa56700                         
#                                                                                                                                                     
# JRE version: OpenJDK Runtime Environment (8.0_362-b09) (build 1.8.0_362-8u372-ga~us1-0ubuntu1~20.04-b09)
# Java VM: OpenJDK 64-Bit Server VM (25.362-b09 mixed mode linux-amd64 compressed oops)             
# Problematic frame:                                                                                                                                  
# C  [libpthread.so.0+0x142ab]  raise+0xcb                                                                                                            
#                                                                                                                                                     
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /ray/hs_err_pid9716.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#
[failure_signal_handler.cc : 332] RAW: Signal 6 raised at PC=0x7f99e800300b while already in AbslFailureSignalHandler()
*** SIGABRT received at time=1686731258 on cpu 0 ***
PC: @     0x7f99e800300b  (unknown)  raise
    @     0x7f996fa6ec85        208  absl::lts_20220623::WriteFailureInfo()
    @     0x7f996fa6e9c8         64  absl::lts_20220623::AbslFailureSignalHandler()
    @     0x7f99e81c6420       3952  (unknown)
    @     0x7f99d22c3843        240  (unknown)
    @     0x7f99d211410e        352  JVM_handle_linux_signal
    @     0x7f99d210731c         64  (unknown)
    @     0x7f99e81c6420      10576  (unknown)
    @     0x7f99d1c2782e         48  (unknown)
    @     0x7f99d1c2cc0f        240  (unknown)
    @     0x7f99d2267a5f        144  (unknown)
    @     0x7f99d2267d53        128  (unknown)
    @     0x7f99d21092a0         64  (unknown)
    @     0x7f99e81ba609  (unknown)  start_thread
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361: *** SIGABRT received at time=1686731258 on cpu 0 ***
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361: PC: @     0x7f99e800300b  (unknown)  raise
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361:     @     0x7f996fa6ec85        208  absl::lts_20220623::WriteFailureInfo()
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361:     @     0x7f996fa6e9e1         64  absl::lts_20220623::AbslFailureSignalHandler()
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361:     @     0x7f99e81c6420       3952  (unknown)
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361:     @     0x7f99d22c3843        240  (unknown)
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361:     @     0x7f99d211410e        352  JVM_handle_linux_signal
[2023-06-14 01:27:38,618 E 9716 9731] logging.cc:361:     @     0x7f99d210731c         64  (unknown)
[2023-06-14 01:27:38,619 E 9716 9731] logging.cc:361:     @     0x7f99e81c6420      10576  (unknown)
[2023-06-14 01:27:38,619 E 9716 9731] logging.cc:361:     @     0x7f99d1c2782e         48  (unknown)
[2023-06-14 01:27:38,619 E 9716 9731] logging.cc:361:     @     0x7f99d1c2cc0f        240  (unknown)
[2023-06-14 01:27:38,619 E 9716 9731] logging.cc:361:     @     0x7f99d2267a5f        144  (unknown)
[2023-06-14 01:27:38,619 E 9716 9731] logging.cc:361:     @     0x7f99d2267d53        128  (unknown)
[2023-06-14 01:27:38,619 E 9716 9731] logging.cc:361:     @     0x7f99d21092a0         64  (unknown)
[2023-06-14 01:27:38,619 E 9716 9731] logging.cc:361:     @     0x7f99e81ba609  (unknown)  start_thread
Fatal Python error: Aborted

Here is the log dump from java:

hs_err_pid9716.log

The segfault occurs almost every time, but not always.

It never occurs when ray is not initialized. Thus there is probably some interference between the ray session/global state and the java/pyarrow/hdfs connection.

Versions / Dependencies

Ray latest master, hadoop 3.2.4, java openjdk version "1.8.0_362"

Reproduction script

def setup_hdfs():                                                                                                                                     
    """Set env vars required by pyarrow to talk to hdfs correctly.                                                                                    

    Returns hostname and port needed for the hdfs uri."""                                                                                             

    # the following file is written in `install-hdfs.sh`.                                                                                             
    with open("/tmp/hdfs_env", "r") as f:
        for line in f.readlines():
            line = line.rstrip("\n")                                                                                                                  
            tokens = line.split("=", maxsplit=1)
            os.environ[tokens[0]] = tokens[1]
    import sys

    sys.path.insert(0, os.path.join(os.environ["HADOOP_HOME"], "bin"))
    hostname = os.getenv("CONTAINER_ID")
    port = os.getenv("HDFS_PORT")
    return hostname, port

import os
import pyarrow
import pyarrow.fs

hostname, port = setup_hdfs()

workspace_dir = f'hdfs://{hostname}:{port}/somewhere'

# from ray.air._internal.remote_storage import upload_to_uri 
# upload_to_uri("/tmp/content", workspace_dir)

def get_list_of_files_under_hdfs_folder(hdfs_folder):
    file_sys, file_path = pyarrow.fs.FileSystem.from_uri(hdfs_folder)
    file_infos = file_sys.get_file_info(pyarrow.fs.FileSelector(file_path, recursive=False))
    return file_infos

print(f"Success!, number of files in {workspace_dir}: {len(get_list_of_files_under_hdfs_folder(workspace_dir))}")
print(f"Success!, number of files in {workspace_dir}: {len(get_list_of_files_under_hdfs_folder(workspace_dir))}")

print("initializing ray, and get number of files again.")
import ray
ray.is_initialized()
ray.init()
print("After ray init", len(get_list_of_files_under_hdfs_folder(workspace_dir)))

Issue Severity

High: It blocks me from completing my task.

wxy117 commented 1 year ago

same error here

yydai commented 7 months ago

Any update? same error

redhatdean commented 6 months ago

Same error

kewenkang commented 6 months ago

Any update? I met with the similar error.

gibchikafa commented 4 months ago

Any update?

pwang697 commented 3 months ago

So far I've found a way to bypass this error. It seems like the Ray framework cannot properly work with the default JNI-based libhdfs (aka $HADOOP_HOME/lib/native/libhdfs.so for Linux OS), so I tried to use a native golang client (github repo: HDFS for Go) to replace the default API. I also implemented a FSSpec interface based on Pydoop (github repo: Pydoop FSSpec Interface) to replace pyarrow.fs.HadoopFileSystem module, because Arrow always raises the following error in my use case (I need to use ray.data.read_parquet to read a 700Mb parquet file from HDFS and use ray.train.RunConfig to set the persistent storage in HDFS for model artifacts):

/arrow/cpp/src/arrow/status.cc:155: Failed to disconnect hdfs client: IOError: HDFS hdfsFS::Disconnect failed.

Here are my tests:

Environment: a ray cluster on kubernetes deployed by kuberay.

Tasks: benchmarks of xgboost model training and prediction using ray.train and ray.data.

Combinations:

  1. pyarrow + Java version of libhdfs = IOError :no_entry: + Core Dump :rage:
  2. pydoopfsspec + Java version of libhdfs = Core Dump (same as test 1) :sob:
  3. pydoopfsspec + Golang version of libhdfs = Job Succeeded :satisfied:

Dumb inference according to above equations:

  1. pyarrow leads to the IOError I encountered
  2. core dump is caused by the Incompatibility between Ray and the JVM of Hadoop's Java fs code.

    More info for wise men who are using ray.train and ray.data and encountering the same errors:

  3. build the libhdfs binary according to HDFS for Go and install Pydoop FSSpec Interface.
  4. set environment variables like:
    export LD_PRELOAD="path/to/your/golang/libhdfs.so"
    export HADOOP_USER_NAME=<your HDFS username>
    export LIBHDFS_DEFAULT_FS=<HDFS host>:<HDFS port> (e.g. "hdfs:8020")
    export LIBHDFS_DEFAULT_USER=<your HDFS username>
    export HADOOP_CONF_DIR="/path/to/your/config/dir"
  5. try the following code:

    def train(
        framework: str, data_path: str, num_workers: int, cpus_per_worker: int
    ) -> ray.train.Result:
        import fsspec
        import pyarrow.fs
        from pydoopfsspec import HadoopFileSystem
        fsspec.register_implementation("pydoop", HadoopFileSystem)
        hdfs = fsspec.filesystem("pydoop")
        fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(hdfs))
    
        ds = data.read_parquet(data_path, filesystem=fs)
        framework_params = _FRAMEWORK_PARAMS[framework]
    
        trainer_cls = framework_params["trainer_cls"]
    
        trainer = trainer_cls(
            params=framework_params["params"],
            scaling_config=ScalingConfig(
                num_workers=num_workers,
                resources_per_worker={"CPU": cpus_per_worker},
                trainer_resources={"CPU": 0},
            ),
            label_column="Label",
            datasets={"train": ds},
            run_config=RunConfig(
                storage_filesystem=fs,
                storage_path=<path to your persistent storage>,
                name=f"{framework}_benchmark",
    
            ),
        )
        result = trainer.fit()
        return result
GitHub-HongweiZhang commented 2 months ago

same error here. is it possible to handle or fix it in ray[data]?

tanmc123 commented 1 month ago

When using pyarrow outside ray, I create a seperate process to connect to HDFS. This allows me to completely isolate the HDFS logic from Ray's usage which solve the conflict between the two.

pwang697 commented 1 month ago

When using pyarrow outside ray, I create a seperate process to connect to HDFS. This allows me to completely isolate the HDFS logic from Ray's usage which solve the conflict between the two.

@tanmc123 Could you please explain a bit more about your solution? How did you isolate the HDFS logic from Ray's usage? I would appreciate it!

tanmc123 commented 3 weeks ago

When using pyarrow outside ray, I create a seperate process to connect to HDFS. This allows me to completely isolate the HDFS logic from Ray's usage which solve the conflict between the two.

@tanmc123 Could you please explain a bit more about your solution? How did you isolate the HDFS logic from Ray's usage? I would appreciate it!

I did something like this:

` def _run_in_subprocess(self, target, *args, **kwargs): """Run isolate process for specific func and catch errors""" result_queue = mp.Queue() p = mp.Process(target=target, args=args + (result_queue,), kwargs=kwargs) p.start() p.join()

    if not result_queue.empty():
        result = result_queue.get()
        if isinstance(result, Exception):
            main_process_trace = traceback.format_stack()
            main_process_trace = "".join(main_process_trace)
            raise HDFSSubProcessException(result, main_process_trace)
        return result

def _upload(self, local_path, hdfs_path, overwrite, result_queue):
    try:
        if self.exist(path=hdfs_path) and not overwrite:
            raise HDFSUtilException(f"File {hdfs_path} already exists")

        hdfs_dir = os.path.dirname(hdfs_path)
        if not self.exist(path=hdfs_dir):
            self.create_dir(path=hdfs_dir)

        if overwrite and self.exist(path=hdfs_path):
            self.delete(path=hdfs_path, recursive=True)
            self.logger.info(f"Old file {hdfs_path} deleted for overwrite")

        with open(local_path, 'rb') as local_file:
            with self.client.open_output_stream(hdfs_path) as f:
                f.write(local_file.read())
        self.logger.info(f"File {local_path} uploaded to {hdfs_path}")
        result_queue.put(None)
    except Exception as e:
        result_queue.put(e)

def upload(self, **kwargs):
    hdfs_path = kwargs.get("hdfs_path", "")
    local_path = kwargs.get("local_path", "")
    overwrite = kwargs.get("overwrite", False)
    self._run_in_subprocess(self._upload, local_path, hdfs_path, overwrite)

`

pwang697 commented 3 weeks ago

I located where the segfault occurred in my use case, the bug was caused by the threads spawned within pyarrow.fs.copy_files() function. I tested using a thread pool to concurrently upload each file in the directory as a whole rather than chunks, the bug rarely occurred again but still existed (when I tried to reduce the CPU limit for ray workers). Finally, I just implemented the method that copies files in a loop, then I haven't met the segfault again. I only modified the python/ray/train/_internal/storage.py, here is my modification: https://github.com/pwang697/ray/commit/9c766cea211369d091add43044169915abbabfa1

It's worth noting that I am using my customised pydoop filesystem instance as shown in https://github.com/ray-project/ray/issues/36415#issuecomment-2241786012 , so that I detected the instance type of pyarrow.fs.PyFileSystem in line 138 of my code, I haven't tested my method on the default pyarrow.fs.HadoopFileSystem.