apache / incubator-uniffle

Uniffle is a high performance, general purpose Remote Shuffle Service.
https://uniffle.apache.org/
Apache License 2.0
381 stars 148 forks source link

[Umbrella] Object Storage Support (Help Wanted) #1030

Open jerqi opened 1 year ago

jerqi commented 1 year ago

Code of Conduct

Search before asking

Describe the proposal

Now, remote shuffle storage only support Hadoop Compatible Filesystem. Object storage is also important and widely used in the big data system. Object storage has different implement. Some systems don't support the method of list or have bad performance of the method list. Some systems need to consider file names to use more buckets of object. Some object storage don't support the method append. Now, we use store index and data separately, it will cause many small index files if we don't support the method append. So we should consider merging index files and data files. To have better performance, we should support object storage to adapt different object storage systems. https://docs.google.com/document/d/1E88wZA9Yhr-pGeUEfxo6uSgsIXxg_ivPYBNcTOeaaZA/edit

Task list

Are you willing to submit PR?

jerqi commented 1 year ago

I'm not familiar with object storage. Could you give me more inputs? @hiboyang @pspoerri @melodyyangaws @zhaohc10 @LantaoJin @yuyang733

jerqi commented 1 year ago

cc @xianjingfeng @zuston Could we finish this issue together?

pspoerri commented 1 year ago

Regarding upload to S3: As long as you use the Apache HDFS S3A adapter you can stream data to an object store. However you can only append as long as you keep the stream open and you can only do so from a single client. The S3A filesystem implementation uses buffered multi-part uploads to stream a file to an object store. Streaming from multiple clients should be possible in principle, but the coordination overhead and the way Java streams are implemented make things tricky.

Regarding .index files: For better performance you can always cache the index files, or serve them from a different location (Redis, Uniffle Coordinator, etc...).

Regarding list support: You can always store the list of objects somewhere else if you want to avoid any expensive file-listing operations. spark-s3-shuffle only uses listings when it needs to delete objects.

jerqi commented 1 year ago

Regarding upload to S3: As long as you use the Apache HDFS S3A adapter you can stream data to an object store. However you can only append as long as you keep the stream open and you can only do so from a single client. The S3A filesystem implementation uses buffered multi-part uploads to stream a file to an object store. Streaming from multiple clients should be possible in principle, but the coordination overhead and the way Java streams are implemented make things tricky.

Regarding .index files: For better performance you can always cache the index files, or serve them from a different location (Redis, Uniffle Coordinator, etc...).

Regarding list support: You can always store the list of objects somewhere else if you want to avoid any expensive file-listing operations. spark-s3-shuffle only uses listings when it needs to delete objects.

Thanks for your input.

zuston commented 1 year ago

cc @xianjingfeng @zuston Could we finish this issue together?

Yes. I will

Regarding upload to S3: As long as you use the Apache HDFS S3A adapter you can stream data to an object store. However you can only append as long as you keep the stream open and you can only do so from a single client. The S3A filesystem implementation uses buffered multi-part uploads to stream a file to an object store. Streaming from multiple clients should be possible in principle, but the coordination overhead and the way Java streams are implemented make things tricky.

I hope the append could be avoided in this design. And I think it's OK to store same partition data into different files in object store. Like this:

s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/0.index
s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/0.data
s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/1.index
s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/1.data
....

....
s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/990.index
s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/990.data

The one flush of shuffle-server for one partition could be flushed into one file. But this is ensured by the following rules.

  1. The partition must be managed by single shuffle-server. Because the id of file prefix name only known with shuffle-server

For reader, it could get the endId (s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/endId.data) from the shuffle-server. That means we need not list operation

If I'm wrong, feel free to point out

xianjingfeng commented 1 year ago

cc @xianjingfeng @zuston Could we finish this issue together?

It is ok for me.

xianjingfeng commented 1 year ago

s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/990.data

s3a://xxxxxxxxxx/{app_id}/{shuffle_id}/{partition_id}/{shuffle_server_id}/990.data may be better?

Regarding .index files: For better performance you can always cache the index files, or serve them from a different location (Redis, Uniffle Coordinator, etc...)

Agree. And I think we can let users to choose whether store index files in memory or other external system.

hiboyang commented 1 year ago

Yeah, a lot of small index files will not work well in object storage like S3. Maybe good idea to store in other places. Or is it possible to serve index file from Spark driver?

pegasas commented 1 year ago

With my limited experience, I think s3 is a good choice in this issue.

If we choose to support s3, then it will easily extend to other filesystems (NFS, CIFS, EFS, GCS fuse, Azure File System) by using a solution like MinIO.

Yeah, a lot of small index files will not work well in object storage like S3. Maybe a good idea to store it in other places. Or is it possible to serve index files from Spark driver?

I think we may have other solution for merging small index files like application-and-practice-of-spark-small-file-merging-function-on-aws-s3?

Feel free to correct me if I am wrong.

jerqi commented 1 year ago

Yeah, a lot of small index files will not work well in object storage like S3. Maybe good idea to store in other places. Or is it possible to serve index file from Spark driver?

Thanks for your input.

jerqi commented 1 year ago

With my limited experience, I think s3 is a good choice in this issue.

If we choose to support s3, then it will easily extend to other filesystems (NFS, CIFS, EFS, GCS fuse, Azure File System) by using a solution like MinIO.

Yeah, a lot of small index files will not work well in object storage like S3. Maybe a good idea to store it in other places. Or is it possible to serve index files from Spark driver?

I think we may have other solution for merging small index files like application-and-practice-of-spark-small-file-merging-function-on-aws-s3?

Feel free to correct me if I am wrong.

I will propose a document at this weekend. First, we can only define some interfaces. Then, we will implement some object systems according to the need of xianjingfeng and zuston.

@xianjingfeng What object system do you want to implement?

jerqi commented 1 year ago

@jiafuzha Do you have extra input?

xianjingfeng commented 1 year ago

@xianjingfeng What object system do you want to implement?

s3

jiafuzha commented 1 year ago

@jiafuzha Do you have extra input?

I was on vacation last week. Do we have any interface defined for RemoteStorageManager? I am looking forward to it since our DAOS is pure remote storage.

zuston commented 1 year ago

Is this on the roadmap? @jerqi @xianjingfeng

zuston commented 9 months ago

I'm interested on this proposal, and will implement this in rust side. @jerqi