ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.5k stars 5.69k forks source link

`ray.data.read_parquet()` does not support GCS path #17501

Closed MattDelac closed 3 years ago

MattDelac commented 3 years ago

Problem

ray.data.read_parquet() does not support GCS paths even when we explicitly pass another filesystem like GCSFileSystem

How to reproduce the problem

Ray cluster created on Kubernetes Python version: python:3.7.9-buster Ray version used: Wheel based on commit 58423e6

Data stored

Python script to generate data

# ./local_folder/generate_data.py
import pandas as pd
FOLDER = "gs://my_bucket/input_folder"

for i in range(5):
    df = pd.DataFrame(data={
        "id": [i] * 100,
        "value": list(range(100)),
    })
    df.to_parquet(f"{FOLDER}/part-{i:05d}.parquet")

Python job Create a python job that contains the following

# ./local_folder/ray_example_using_gcs.py
import os
import ray
import modin.pandas as pd
from gcsfs import GCSFileSystem

INPUT_FOLDER = "gs://my_bucket/input_folder"
OUTPUT_FOLDER = "gs://my_bucket/output_folder"

def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(
        new_value=lambda df: df["value"] * 2
    )

@ray.remote
def preprocess_and_save(file_path, output_folder, fs):
    df = ray.data.read_parquet(paths=file_path, filesystem=fs)
    df = df.map_batches(fn=preprocess_data)
    df.write_parquet(path=os.path.join(output_folder, file_path.split("/")[-1]), filesystem=fs)

def main():
    fs = GCSFileSystem()
    files = fs.ls(INPUT_FOLDER)
    futures = [preprocess_and_save.remote(file_path=file_path, output_folder=OUTPUT_FOLDER, fs=fs) for file_path in files]
    ray.get(futures)

if __name__ == "__main__":    
    ray.init("ray://docker.for.mac.host.internal:10001")
    try:
        main()
    except Exception as e:
        raise e
    finally:
        ray.shutdown()

Error

    raise err
types.RayTaskError(ValueError): ray::preprocess_and_save() (pid=884, ip=10.6.0.177)
  File "python/ray/_raylet.pyx", line 535, in ray._raylet.execute_task
  File "/usr/local/lib/python3.7/site-packages/clv/matt_preprocessing_example.py", line 16, in preprocess_and_save
  File "/usr/local/lib/python3.7/site-packages/ray/experimental/data/read_api.py", line 165, in read_parquet
    **arrow_parquet_args)
  File "/usr/local/lib/python3.7/site-packages/ray/experimental/data/read_api.py", line 118, in read_datasource
    read_tasks = datasource.prepare_read(parallelism, **read_args)
  File "/usr/local/lib/python3.7/site-packages/ray/experimental/data/datasource/parquet_datasource.py", line 39, in prepare_read
    paths, filesystem)
  File "/usr/local/lib/python3.7/site-packages/ray/experimental/data/datasource/file_based_datasource.py", line 169, in _resolve_paths_and_filesystem
    raise ValueError("All paths must use same filesystem.")
ValueError: All paths must use same filesystem.

This is due to the fact that Ray does not support GCS paths even though we provide GCSFileSystem as a filesystem. Because Ray does not support GCS natively, it makes assumption about when to resolve a "local path" during the function _resolve_paths_and_filesystem

ericl commented 3 years ago

This is related to https://github.com/ray-project/ray/issues/17325:

  1. Resolve paths and filesystem is not working properly (and also there is a bad error message).
  2. The parquet reader shouldn't be using resolve paths in the first place (17325).