apache / incubator-uniffle

Uniffle is a high performance, general purpose Remote Shuffle Service.
https://uniffle.apache.org/
Apache License 2.0
370 stars 143 forks source link

[Umbrella] Remote merge on the shuffle server side. #1239

Open zhengchenyu opened 11 months ago

zhengchenyu commented 11 months ago

Code of Conduct

Search before asking

Describe the proposal

Some RSS jobs may run on low-storage machines, and we want to avoid using local disk as much as possible. In fact, the merge process in the shuffle phase uses more local disks. There are two ways to solve this problem:

  1. Spill to merged data to remote file system.
  2. Merge on the shuffle server side.

This issue will describe the second proposal.

Task list

TODO:

Are you willing to submit PR?

zuston commented 10 months ago

This optimization also could be applied on Spark. One concern is that whether it will put more pressure on shuffle-server, like IO/cpu ?

zhengchenyu commented 8 months ago

1 Default shuffle

Note: The first chapter briefly introduces the principle of default shuffle, with the purpose of find where local disks are used, then design remote merge. If you know enough, you can ignore this part.

We will analyze the shuffle of MapReduce, Tez, and Spark in turn.

1.1 MapReduce

Map writes the record to the memory. When the memory exceeds the threshold, the memory data is spilled to the disk file, and the Record is written to the disk file in order of partitionid+key. After Map has processed all records, it will spill the data currently in memory to a disk file. Then read all the files spilled to the disk and merge them in the order of partitionid+key to get the sorted Records.

Note: The purpose of sorting according to partitionid is that when the Reduce side obtains the data from the Map side, it should be read as sequentially as possible. For MR, Tez, and Spark, regardless of whether they are sorted or not, as long as there are partitioned, they need to be sorted according to partitionid.

The reduce will pull the records of the corresponding partition remotely or locally from the Map, which is called MapOutput. Under normal circumstances, the memory will be used directly. If the memory exceeds the threshold, these records will be written to the disk. Then the reduce will perform merge operations on MapOutputs using minimum heap K-way merge sorting to obtain globally sorted records. During the Merge process, temporary results may be spilled to disk because the memory exceeds the threshold. In addition, if there are too many files spilled to disk, additional merges will be triggered.

1.2 Tez

There are two cases of tez: (1) ordered io (2) unordered io.

Ordered io is the same as MapReduce and so ignore it here.

Unordered io is generally used in hashjoin and other situations where keys are not required for sorting. Non-sorted io adopts a ready-to-use solution. Map writes the Record directly to the file or writes it to the file through cache. The Reduce side can also read and use it when reading data.

1.3 Spark

Spark's shuffle is more complex and more reasonable. Some tasks do not require sort and combine, so spark users can determine the shuffle logic according to their needs.

1.3.1 Shuffle write operation

When writing shuffle data, three writers are supported:

A temporary file is generated for each partition. When writing record, find the corresponding partition and write it directly to the corresponding temporary file. Then when all data is processed, these temporary files are written to a final file in order of the partitions, and the temporary files are deleted.

UnsafeShuffleWriter mainly implements specific logic through ShuffleExternalSorter. When writing a Record, the serialization operation is performed directly and the serialized bytes are copied to the requested memory. At the same time, the address and partition of the record will also be recorded into the memory (inMemSorter).

When the memory reaches the threshold, spill operation will be performed. Based on the information in memory (inMemSorter), we can easily get a Record sorted by partition and write it to a file.

When all Records are processed, we will spill the records currently in memory into the file. Finally, all spilled files are aggregated once. Since the previously spilled files have been sorted according to the partition, we can copy the corresponding copies of all the spilled files to the final file in the order of the partitions. The final file obtained in this way is the partition-sorted file.

SortShuffleWriter mainly implements specific logic through ExternalSorter. ExternalSorter decides whether to combine and sort based on the user's needs.

When writing record, it will be inserted directly into memory. If combine is required, the memory architecture is map, otherwise it is buffer.

If the current evaluation memory is greater than the threshold, the spill operation will be triggered. During the spill operation, the Record will be spilled to the disk. This process requires sorting. The specific comparator will use different values according to different user needs. If keyordering is set, it will be sorted by key. If keyordering is not set, but aggregator (i.e. combine) is set, the keys are sorted according to the hashcode of key, thus ensuring that the same keys are organized together to facilitate combine operations. If neither keyordering nor aggregator is set, it will be sorted according to partition.

When all Records are written, the spill files need to be read and merged into a globally ordered file. ​ Comparison of three writers

writer advantages disadvantages scene
BypassMergeSortShuffleWriter (1) Only serialized once.
(2) Using hashmap-like data structure, inserting data is fast.
(1) Combine and sort are not supported
(2) Each partition must generate a temporary file, which will generate too many temporary files.
Suitable for situations where the number of partitions is small (default is less than or equal to 200) and there is no combine.
UnsafeShuffleWriter (1) Only serialized once.
(2) The number of files spilled to disk is limited and is no longer based on the number of partitions, and can support larger partitions.
(1) Combine, sort is not supported
(2) The writing order Record order will be disrupted, and supportsRelocationOfSerializedObjects is required.
Applicable to situations where combine does not exist, and supportsRelocationOfSerializedObjects is true, and the maximum number of supported partitions is 16777216.
SortShuffleWriter (1) Supports combine, sort
(2) Suitable for all scenarios
(3) The number of files spilled to disk is limited
(1) Multiple serializations are required Suitable for all scenarios.

1.3.2 shuffle read

Currently there is only one implementation of BlockStoreShuffleReader. The implementation is similar to MapReduce. The reduce will pull the records of the corresponding partition remotely or locally from the map. Under normal circumstances, it will be written directly to the memory, but if the block size to be obtained exceeds the threshold, will use disk. Then it will be decided according to the user's needs whether to combine or sort, and finally form a record iterator required by the user. Combine and sort use ExternalAppendOnlyMap and ExternalSorter respectively. When the memory exceeds the threshold, the data will be spilled to the local disk.

1.4 Summary

(1) About the semantics of each framework

For MapReduce and the ordered io of Tez, it is a special case of spark sorting. For Tez's unordered io, it is essentially a special case of spark's non-sorting. In essence, the semantics of each framework are the same, and spark is more general.

(2) Where will generate local disk files?

After analyzing the three computing frameworks, we learned that the following processes will use disks:

In fact, uniffle has solved (1), (2). For (3), if the parameters are adjusted effectively, it is difficult to generate disk files. In fact, only (4) needs to be discussed in this article.

2 Plans

In order to solve the problem that Merge on the Reduce side may spill to disk, there are two main solutions:

2.2 Option 2: On-demand Merge on the Reduce side

on_demand_merge

Since the memory on the reduce side is limited, in order to avoid spilling data to disk when merging on the reduce side. When reduce obtains segment, it can only read part of the buffer of each segment, and then merge all the buffers. Then when the partial buffer reading of a certain segment is completed, the next buffer of this segment will continue to be read, and this buffer will continue to be added to the merge process. There is a problem with this. The number of times the Reduce side reads data from ShuffleServer is approximately segments_num * (segment_size / buffer_size), which is a large value for large tasks. Too many RPCs means decreased performance.

The segment here refers to the sorted record collection, which can be understood as the block in which the records have been sorted according to key.

This article chooses option 1, and the following content mainly discusses option 1.

3 Demand analysis

3.1 What types of tasks require remote merge?

Currently, uniffle's map-side no longer spill disk. This article mainly considers the situation on the reduce. Mainly divided into the following situations:

For sorting, map is generally sorted to obtain a set of partially sorted records, which is called segment here. Then reduce will obtain all segments and merge them. Spark, MR, and Tez all use minimum heap K-way merge sorting. This method can still be used for remote sorting.

BufferManager and FlushManager maintain block information in memory and disk. We only need to add MergeManager to ShuffleServer and merge the blocks under the same Shuffle to obtain globally sorted Records.

Introducing sorting on the ShuffleServer produces a side effect: the Shuffle's KeyClass, ValueClass and KeyComparator need to be passed to ShuffleServer.

3.3 How does ShuffleServer combine?

Combine is generally a user-defined operation, so ShuffleServer is prohibited from performing combine operations. If combine is performed on the Reduce side, wouldn't it violate our theme of avoiding spill to disk on the task side? In fact we don't have to use ExternalAppendOnlyMap for combine. If the Records obtained from ShuffleServer are sorted by key, it means that the same keys have been organized together, and only a small amount of memory is needed to combine.

3.4 How does Writer write?

Just write it the way we have it.

3.5 How does Reader read?

Currently, Uniffle's shuffle reader uses blockid as the read mark, which makes it easy to verify whether an accurate and complete records are obtained. For remote merge, MergeManager has merged the original Block collection into a new sequence sorted records by key. Therefore, the blockid generated by the map segment cannot be used: We will use a new way to read Records. When MergerManager performs global Merge, an index will be generated. Reader will read according to this index.

Note: In principle, using key as a read index is more semantic, and the first version of the demo program was also implemented by this way. However, this proposal was not friendly enough to deal with the problem of data skew, so gave up the plan.

4 Scheme Design

4.1 Basic procedures for RemoteMerge

remote_merge_structure

The following introduces the process of Remote Merge:

4.2 Analyze the process from the perspective of Record

We can use WordCount as an example to explain the flow of record in the entire process. In this example, there are two partitions and one reduce, that is, one reduce processes the data of two partitions.

remote_merge_from_record_perspective

5.1 Unified serializer

Since Merge needs to be performed on the ShuffleServer side, a unified serializer that is independent of the computing framework needs to be extracted. Two types of serializers are extracted here: (1) Writable (2) Kryo. Writable serialization is used for classes that handle the org.apache.hadoop.io.Writable interface, used in the MR and TEZ frameworks. Kryo can serialize most classes and is generally used in the Spark framework.

5.2 RecordsFileWriter/RecordFileReader

Provides abstract methods for processing Records

5.3 Merger

Provides basic Merger service to merge multiple data streams according to key. Minimum heap K-way merge sorting is used to merge and sort the data streams that have been partially sorted.

5.4 MergeManager

Used to merge Records on the server side.

5.5 Tools for reading sorted data

Generally speaking, when the Reduce side reads data, it can be sent directly to downstream calculations. But there are two special situations: (1) For situations where it is necessary to combine in Merge, we need to wait for all the same keys to arrive before combining, and then send them to the downstream. (2) For spark and tez, the reduce end can read data from multiple partitions. Therefore, we need to merge the data of multiple partitions again on the reduce side, and then send it to the downstream. RMRecordsReader is a tool for reading sorted data. The general structure is as follows:

rm_records_reader

The figure depicts a situation where a single Reduce processes two partitions. The RecordsFetcher thread will read the block of the corresponding partition and then parse it into Records. Then send it to combineBuffers. RecordCombiner reads Records from combineBuffer. When all records of a certain key are collected, a combine operation is performed. The result will be sent to mergedBuffer. RecordMerge will obtain all mergedBuffers and then merge and sort them again in memory. Finally, the global sorting results are obtained for downstream.

5.6 Framework adaptation

Compatible with MR, Tez, and Spark architectures.

I has used online application to conduct large-scale stress testing on MR and Tez. Spark has currently only tested some basic examples and still needs a lot of testing.

5.7 Isolated classloader

For different versions of keyclass, valueclass and comparatorclass, use isolated classloaders to load them.

Chinese version document: https://zhengchenyu.github.io/2023/12/25/RSS-%E8%BF%9C%E7%A8%8BMerge%E7%9A%84%E8%AE%BE%E8%AE%A1/