apache / incubator-uniffle

Uniffle is a high performance, general purpose Remote Shuffle Service.
https://uniffle.apache.org/
Apache License 2.0
388 stars 149 forks source link

[FEATURE] Cache index files on the server side #407

Open xianjingfeng opened 1 year ago

xianjingfeng commented 1 year ago

Code of Conduct

Search before asking

Describe the feature

If we use AQE, index files will be read many times. So we should cache it in memory.

Motivation

For better performance and lower disk io.

Describe the solution

  1. Cache it. Free discussion on specific details.
  2. Filter index data before return to the clients.

Additional context

No response

Are you willing to submit PR?

xianjingfeng commented 1 year ago

PTAL @jerqi @zuston

advancedxy commented 1 year ago

Do you observe any performance issue for this case?

If the index file is only accessed few times(say 1,2 times), there's no need to cache this file. And normally, the linux system will have file cache to speed up file access.

If the shuffle server is indeed influenced by this problem, I believe we may need an IndexFileAccessManager to trigger the index file caching and manage eviction.

zuston commented 1 year ago

I think this will bring some memory pressure, especially for some huge partitions.

xianjingfeng commented 1 year ago

Do you observe any performance issue for this case?

No, I just think this should be an optimization point.

If the index file is only accessed few times(say 1,2 times), there's no need to cache this file.

We can just cache the partitions of applicaitions which enable AQE

And normally, the linux system will have file cache to speed up file access.

But linux system will cache the data file too.

xianjingfeng commented 1 year ago

I think this will bring some memory pressure, especially for some huge partitions.

I had counted the files of a server in our production environment. About 10M of data on average generates 1K index file. It means that if we have 10T data, there are about 1G index files. If we compress the index before store it in memory, i think it's acceptable.

advancedxy commented 1 year ago

We can just cache the partitions of applicaitions which enable AQE

AQE is enabled by default in later spark versions such as Spark 3.3 and also it may turned on by default in some production envs. So I believe by simply cache index file for all the AQE applications might not be sufficient. The cache behavior might still be triggered by access pattern.

Also another question, in which cases AQE would trigger reading the same partition many times? I know the OptimizeSkewedJoin would split the same partition into multiple parts, and therefore trigger the behavior you described. Is there any other cases?

xianjingfeng commented 1 year ago

Also another question, in which cases AQE would trigger reading the same partition many times? I know the OptimizeSkewedJoin would split the same partition into multiple parts, and therefore trigger the behavior you described. Is there any other cases?

rebalance

zuston commented 1 year ago

About 10M of data on average generates 1K index file. It means that if we have 10T data, there are about 1G index files.

Emm... It's too big. For huge partition, 200G data size is normal.

zuston commented 1 year ago

Besides, we'd better to have a unified memory manager in shuffle-server if using this cache.

advancedxy commented 1 year ago

Also another question, in which cases AQE would trigger reading the same partition many times? I know the OptimizeSkewedJoin would split the same partition into multiple parts, and therefore trigger the behavior you described. Is there any other cases?

rebalance

could you elaborate it a bit more? Do you mean OptimizeSkewInRebalancePartitions? I just checked the code and I believe the optimization rule is only applied when user rebalances output and there's indeed some skew partitions.

xianjingfeng commented 1 year ago

Emm... It's too big. For huge partition, 200G data size is normal.

I mean even if we cache all the local index files, we won't use too much memory. I think 3g is enough for most servers. For us, the bottleneck is the network and disk, not the memory.

Besides, we'd better to have a unified memory manager in shuffle-server if using this cache.

You are right. And I think we should do this after #133 .

xianjingfeng commented 1 year ago

Do you mean OptimizeSkewInRebalancePartitions? I just checked the code and I believe the optimization rule is only applied when user rebalances output and there's indeed some skew partitions.

Yes. Reduce task will read all index files of all partitions. You can try to run a job and trace the logs.

advancedxy commented 1 year ago

Yes. Reduce task will read all index files of all partitions. You can try to run a job and trace the logs.

This doesn't seem right. Do you have a minimal reproduce job to reproduce this case?

xianjingfeng commented 1 year ago

This doesn't seem right. Do you have a minimal reproduce job to reproduce this case?

https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala#L228

xianjingfeng commented 1 year ago

If we cache it, we don't even need to flush it to the disk. @zuston @jerqi @advancedxy

advancedxy commented 1 year ago

https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala#L228

I didn't get a change to reproduce this case. I will do it in this week.

I still think it should be cached when necessary and selectively.

If we cache it, we don't even need to flush it to the disk

Well, I'm for this idea as I'd like to reduce small I/Os to disk as much as possible. But memory is still an important factor to consider, as we want to deploy Uniffle shuffle server with smaller memory, say 32GB or 48GB.

xianjingfeng commented 1 year ago

Well, I'm for this idea as I'd like to reduce small I/Os to disk as much as possible. But memory is still an important factor to consider, as we want to deploy Uniffle shuffle server with smaller memory, say 32GB or 48GB.

This feature can be optional.

xianjingfeng commented 1 year ago

https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala#L228

I didn't get a change to reproduce this case. I will do it in this week.

I still think it should be cached when necessary and selectively.

You can see this method and pay attention to startMapIndex endMapIndex startPartition endPartition when debug the above ut https://github.com/apache/spark/blob/0fd1f85f16502d1d4222cf3d7abfd5e2b86464e6/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L124

maobaolong commented 2 weeks ago

Is it possible to store index into rocksdb as rocksdb is well known to store meta data for some distributed storage system, e.g. Alluxio, Ozone.

Or we can store index file into a distributed KV store, for example TIKV?