ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.11k stars 5.6k forks source link

Ray Data list of json files leading to disk spilling #46810

Open BabyChouSr opened 1 month ago

BabyChouSr commented 1 month ago

What happened + What you expected to happen

What happened

I'm trying to process a directory of json files using Ray Data using batch offline inference classification, but after a couple of datasets are done being processed, the objects start getting spilled to the disk. I'm only processing one directory at a time (I have a backpressure function)

The reason why this is unexpected is because the instances I'm using have 400GB CPU RAM and the object store memory is 119GB for each instance. Unfortunately, the disk memory is only 97GB which is not very much. I would not expect the objects to be spilled to disk because the list of 266 jsonl.gz files is at most about 6 MB each. This means that at a pretty high compression ratio of 5, this would require 7.98GB which is not that high. Also, the output blocks that I'm expecting are only about 200KB each (I basically remove all the text in the output since I just want the model outputs' responses).

Also, the object store memory and local RAM is not being fully utilized: only about 10-30GB of the 119GB.

What I expected to happen

I would expect that the machines put the intermediate and output blocks to the object memory store and then after they are written to the final destination, it is garbage collected. Maybe there is something wrong with reference counting? Or the fact that we have to read everything into memory when reading the files? I'm unsure.

Anyways, thank you!

Versions / Dependencies

Head node was setup with these pip libraries

    pip install "chardet==5.2.0"
    "cryptography"
    "datasets==2.19.0"
    "dill==0.3.8"
    "draccus @ git+https://github.com/dlwh/draccus"
    "fsspec~=2024.3"
    "gcsfs~=2024.3"
    "google-api-python-client==2.129.0"
    "google-cloud-storage-transfer==1.11.3"
    "html2text==2024.2.26"
    "htmlmin==0.1.12"
    "markdownify==0.12.1"
    "multiprocess==0.70.16"
    "py7zr==0.21.0"
    "ray[default]==2.22.0"
    "rbloom==1.5.0"
    "readabilipy==0.2.0"
    "readability-lxml==0.8.1"
    "regex==2024.4.16"
    "requests"
    "tensorflow==2.16.1"
    "trafilatura==1.8.1"
    "tqdm"

Reproduction script

Unfortunately a repro script is not that possible given it relies on external data. But I can give the rough idea without the data.

class JsonFilenameProvider(FilenameProvider):

    def __init__(self, files: List[str], input_dir: str):
        self.files = files
        self.input_dir = input_dir

    def get_filename_for_block(self, block, task_index, block_index):
        input_filename = self.files[task_index]
        output_filename = os.path.basename(input_filename)
        return output_filename

@ray.remote
def process_file_using_actor_pool(input_dir, output_dir, pattern, model_ref, model_local_filepath):
    ctx = ray.data.DataContext.get_current()
    ctx.execution_options.preserve_order = True

    print(f"[*] Reading in dataset {input_dir}")
    print(f"[*] Output directory is {output_dir}")

    # get all files from current directory
    files = fsspec_glob(os.path.join(input_dir, pattern))

    # 1 read task = 1 block = 1 write task
    ds = ray.data.read_json(
        files,
        arrow_open_stream_args={"compression": "gzip"},
        override_num_blocks=len(files),
    ).map_batches(
        AutoClassifier,
        # concurrency=(1,16),
        concurrency=(1, len(files)),
        fn_constructor_args=(model_ref, model_local_filepath),
        batch_size=None,
    ).write_json(output_dir, filename_provider=JsonFilenameProvider(files, input_dir), arrow_open_stream_args={"compression": "gzip"})

def main():
        model_ref = ray.put(bytes([0 * 1024 * 1024 * 2])) # assume I put some 2GB byte buffer here
        subdirectories = fsspec_get_curr_subdirectories(inference_config.input_dir) # assume gets all current subdirectories, equivalent to calling `ls`
        responses = []

        # Copied the ray.wait backpressure pattern here
        for input_subdir in subdirectories:
            if len(responses) > inference_config.task.max_in_flight:  # max in flight set to 1
                ready_refs, responses = ray.wait(responses, num_returns=1)
                ray.get(ready_refs)

            output_subdir = rebase_file_path(inference_config.input_dir, input_subdir, inference_config.output_dir)
            fsspec_mkdirs(output_subdir)

            result_ref = process_file_using_actor_pool.options(
                memory=inference_config.runtime.memory_limit_gb * 1024 * 1024 * 1024, # = 64GB
                runtime_env=RuntimeEnv(
                    pip=inference_config.runtime.requirements_filepath,
                ),
                resources=resources,
            ).remote(input_subdir, output_subdir, "**/*.jsonl.gz", model_ref, model_local_filepath)

            responses.append(result_ref)

if __name__ == "__main__":
    main()

Issue Severity

High: It blocks me from completing my task.

Bye-legumes commented 1 month ago

maybe streaming jsonl reading may help you as it will process the data after reading..I have a PR for that maybe you can check if it can help you? https://github.com/ray-project/ray/pull/46550