apache / gravitino

World's most powerful open data catalog for building a high-performance, geo-distributed and federated metadata lake.
https://gravitino.apache.org
Apache License 2.0
1.1k stars 348 forks source link

[Subtask] Add Python Arrow FileSystem implementation for fileset. #2059

Closed coolderli closed 5 months ago

coolderli commented 9 months ago

Describe the subtask

Add Python Arrow FileSystem implementation for fileset.

Parent issue

1241

coolderli commented 9 months ago

@jerryshao I just created this subtask. So we can talk more about it here. I'm thinking if we should support a python client. In the python Arrow Filesystem, we have to load the fileset and get the physical storage location.

jerryshao commented 9 months ago

Yeah, I think so. We should have a python client beforehand.

coolderli commented 9 months ago

@jerryshao I found the client module dependent on api and common module. Should we have to implement them in Python again? Or we can use some bridge to invoke them like Py4j. Maybe later we may need a go or c client. Can you share your thoughts? Thanks.

I think Ray engine(https://github.com/datastrato/gravitino/issues/1355) also needs a Python library. Are we already working on this?

jerryshao commented 9 months ago

We planned to do the python library, but we haven't yet kicked off this.

From my wild thinking, because we use REST protocol to communicate with server, so we don't have to use py4j to bridge Java code. One possible way is to also write a Python counterpart.

Also, I was thinking that since we are using REST protocol, maybe we can build a model and using some code generation tools to generate sdks for different languages.

coolderli commented 6 months ago

@jerryshao @xunliu @shaofengshi Hi, I'm coming to talk about how to implement Python gvfs. After talking with @xloya offline, I think we have two ways.

  1. Implement GravitinoFileSystem in Python. The GravitinoFilesystem will inherit pyarrow.fs.FileSystem and proxy the real FileSystem such as pyarrow.fs.HadoopFilesystem or pyarrow.fs.S3FileSystem. More implementation of Filesystem can be found in this doc. In this way, we have to implement a new FileSystem, and the real implementation will depend on the pyarrow. If certain storage protocol types are missing, we need to implement them on the pyarrow side.
  2. Just use the pyarrow.fs.HadoopFilesystem to read the gvfs. The gvfs is compatible with the Hadoop protocol. So we can use the pyarrow.fs.HadoopFilesystem.from_uri("gvfs://fileset/catalog/schema/fileset") to reading the data. In this way, we will also use the Hadoop protocol to read the object storage. It's consistent with the usage of Java. However, when using it, some parameter transfer issues may need to consider whether it is sufficiently user-friendly.

By the way, I saw the issue about the tensorflow-connector and pytorch-connector. Will we provide some advanced APIs for greater ease of use?Can you share your thoughts? Thanks.

xloya commented 6 months ago

@jerryshao @xunliu @shaofengshi Hi, I'm coming to talk about how to implement Python gvfs. After talking with @xloya offline, I think we have two ways.

  1. Implement GravitinoFileSystem in Python. The GravitinoFilesystem will inherit pyarrow.fs.FileSystem and proxy the real FileSystem such as pyarrow.fs.HadoopFilesystem or pyarrow.fs.S3FileSystem. More implementation of Filesystem can be found in this doc. In this way, we have to implement a new FileSystem, and the real implementation will depend on the pyarrow. If certain storage protocol types are missing, we need to implement them on the pyarrow side.
  2. Just use the pyarrow.fs.HadoopFilesystem to read the gvfs. The gvfs is compatible with the Hadoop protocol. So we can use the pyarrow.fs.HadoopFilesystem.from_uri("gvfs://fileset/catalog/schema/fileset") to reading the data. In this way, we will also use the Hadoop protocol to read the object storage. It's consistent with the usage of Java. However, when using it, some parameter transfer issues may need to consider whether it is sufficiently user-friendly.

By the way, I saw the issue about the tensorflow-connector and pytorch-connector. Will we provide some advanced APIs for greater ease of use?Can you share your thoughts? Thanks.

Some test codes for solution 2, it has been preliminarily verified and can be used:

if __name__ == "__main__":
    from pyarrow import fs
    configs = {
        'fs.gvfs.impl': 'com.datastrato.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem',
        'fs.AbstractFileSystem.gvfs.impl': 'com.datastrato.gravitino.filesystem.hadoop.Gvfs',
        'fs.gravitino.server.uri': 'http://localhost:8090',
        'fs.gravitino.client.metalake': 'gravitino_test',
        'fs.gravitino.client.authType': 'simple'
    }
    config_string = '&'.join([f"{key}={value}" for key, value in configs.items()])

    concat_uri = "{}?{}".format("gvfs://fileset", config_string)
    gvfs = fs.HadoopFileSystem.from_uri(uri=concat_uri)

    # read a parquet file
    import pyarrow.parquet as pq
    table = pq.read_table(
        "gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507/part-00001-6a1fc414-798b"
        "-4d2f-a6d3-4f7c39191f76-c000.snappy.parquet", filesystem=gvfs)
    print(table.to_batches())

    # list file info
    fileInfo = gvfs.get_file_info(
        fs.FileSelector("gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507"))
    for info in fileInfo:
        print(info)

    # open a file, and download its content
    with gvfs.open_input_stream(
            "gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507/part-00001-6a1fc414-798b-4d2f-a6d3"
            "-4f7c39191f76-c000.snappy.parquet") as file:
        inputs = file.readall()
        print(inputs)

    # write a file
    import pyarrow as pa
    table_schema = pa.schema([
        ("id", pa.int32()),
        ("name", pa.string()),
        ("age", pa.int32())
    ])
    with gvfs.open_output_stream(
            "gvfs://fileset/fileset_test_catalog/tmp/test_fileset_3/date=20240507/test.arrow") as file:
        with pa.RecordBatchFileWriter(file, table_schema) as writer:
            writer.write_table(table)
jerryshao commented 6 months ago

Thanks @coolderli @xloya for your input, for how to support Arrow/Ray, I may need more investigation to see how we can support this, just give me some time, appreciated.

noidname01 commented 6 months ago

Hi @jerryshao @coolderli @xloya , I would love to help this, is there anything I can help? I have already tried Solution 2 above and it does work.

jerryshao commented 6 months ago

I feel like solution 2 is a python wrapper for Hadoop client, while solution 1 is a pure python solution, right? I feel like solution 1 could be more generic, what do you think?

xloya commented 6 months ago

I feel like solution 2 is a python wrapper for Hadoop client, while solution 1 is a pure java solution, right? I feel like solution 1 could be more generic, what do you think?

According to my understanding and research, solution 2 reuses the capabilities of PyArrow.HadoopFileSystem, and its calling process is PyArrow.HadoopFileSystem -> libhdfs.so -> GVFS. In solution 1, I think that what we need to do is another FileSystem similar to PyArrow.HadoopFileSystem, such as PyArrow.GravitinoFileSystem. If we need to access HDFS in PyArrow.GravitinoFileSystem, we still need the Hadoop environment in the Python environment. So we still need to go through the process of PyArrow.GravitinoFileSystem -> libhdfs.so -> GVFS.

coolderli commented 6 months ago

I feel like solution 2 is a python wrapper for Hadoop client, while solution 1 is a pure java solution, right? I feel like solution 1 could be more generic, what do you think?

According to my understanding and research, solution 2 reuses the capabilities of PyArrow.HadoopFileSystem, and its calling process is PyArrow.HadoopFileSystem -> libhdfs.so -> GVFS. In solution 1, I think that what we need to do is another FileSystem similar to PyArrow.HadoopFileSystem, such as PyArrow.GravitinoFileSystem. If we need to access HDFS in PyArrow.GravitinoFileSystem, we still need the Hadoop environment in the Python environment. So we still need to go through the process of PyArrow.GravitinoFileSystem -> libhdfs.so -> GVFS.

@xloya Yes. In solution 1, the object storage such s3 will use the PyArrow.S3Filesystem not the PyArrow.HadoopFilesystem. I think it's more native, it will not need the Hadoop configuration. Considering the cloud-native environment, I think Solution 1 may be better.

noidname01 commented 6 months ago

I also think the Solution 1 is better, the Hadoop configuration is redundant for the case if we only need other storage like S3, and it depends on the Hadoop Native Libraries (a.k.a. libhdfs.so), which are needed to build from source and manually configure everything related to generated libhdfs.so, I think it's not user-friendly for non-HDFS users.

xloya commented 6 months ago

@noidname01 Hi, we had a brief discussion on this issue with @jerryshao yesterday, and we agreed with solution 1. We can directly connect to storage SDKs such as S3 / OSS through this solution, but I think HDFS also needs support. So I will investigate the feasibility of solution 1 firstly. If you are interested, please participate in the subsequent design and development. Thanks!

noidname01 commented 6 months ago

@xloya Sounds great, I'm in👍

xloya commented 6 months ago

@jerryshao @noidname01 @coolderli Hi, I have opened a draft PR(#3528, the code is not complete yet) for this. I will implement GVFS based on fsspec interfaces, some popular cloud storages or companies also choose this solution. And I will only support HDFS firstly, and improve it to support more storages and auth types in the next sub-tasks(@noidname01 You could participate in these sub-tasks). Do you have any additional feedback on this?

coolderli commented 6 months ago

@jerryshao @noidname01 @coolderli Hi, I have opened a draft PR(#3528, the code is not complete yet) for this. I will implement GVFS based on fsspec interfaces, some popular cloud storages or companies also choose this solution. And I will only support HDFS firstly, and improve it to support more storages and auth types in the next sub-tasks(@noidname01 You could participate in these sub-tasks). Do you have any additional feedback on this?

@xloya Can you please explain more about why chose fsspec but not pyarrowFilesystem? Then everyone can provide more feedback. Another reason to choose pyArrow is that Arrow is convenient integration with other engines. You can give some code to show how fsspec achieves it.

noidname01 commented 6 months ago

Like Ray support PyArrow filesystem, I'm not sure it supports fsspec as well🤔

xloya commented 6 months ago

@jerryshao @noidname01 @coolderli Hi, I have opened a draft PR(#3528, the code is not complete yet) for this. I will implement GVFS based on fsspec interfaces, some popular cloud storages or companies also choose this solution. And I will only support HDFS firstly, and improve it to support more storages and auth types in the next sub-tasks(@noidname01 You could participate in these sub-tasks). Do you have any additional feedback on this?

@xloya Can you please explain more about why chose fsspec but not pyarrowFilesystem? Then everyone can provide more feedback. Another reason to choose pyArrow is that Arrow is convenient integration with other engines. You can give some code to show how fsspec achieves it.

I consider some resons from the following aspects:

  1. Interface diversity: fsspec provides a richer interface implementation and is a lower-level file interface definition.
  2. Storage system support: fsspec currently supports mainstream cloud storage systems(s3/azure/gcs/oss) is more than PyArrow(s3/gcs).
  3. fsspec has very good compatibility with PyArrow: This means that any file system implemented with reference to fsspec can be used compatible with PyArrow, you could check docs in Arrow web(https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow).
  4. The open source implementation of fsspec is more abundant: Whether it is a cloud storage vendor or a data company, they will give priority to implementing file systems through fsspec. I currently cannot find any additional open source file system implementations other than those in the PyArrow project.
xloya commented 6 months ago

Like Ray support PyArrow filesystem, I'm not sure it supports fsspec as well🤔

I think it is theoretically possible, because judging from the descriptions in the fsspec and PyArrow documents, the file systems of the two are compatible with each other. In addition, fsspec is naturally supported by pandas, so I think ray can analyze and process data through pandas.

xloya commented 6 months ago

Tesorflow and Pytorch Lighting natively support fsspec-compatible file systems: 1.https://github.com/tensorflow/tensorboard/pull/5248 2.https://lightning.ai/docs/pytorch/stable/common/remote_fs.html

jerryshao commented 6 months ago

@xloya do you have some documents about fsspec, maybe we should discuss more about the pros and cons of fsspec, pyarrow filesystem and others.

xloya commented 6 months ago

@xloya do you have some documents about fsspec, maybe we should discuss more about the pros and cons of fsspec, pyarrow filesystem and others.

Yeah, I will post a doc tomorrow.

xloya commented 6 months ago

@jerryshao @noidname01 @coolderli Hi, I have open a document for implementation selections. Please take a look and comment if you have any questions. https://docs.google.com/document/d/1y1GX7HWha1DH6XFU7VC_0NzmOoTA9j3Ppw1yEzjPpf8/edit?usp=sharing