Open revans2 opened 1 year ago
Actually I have some new information, which might help us a lot in all of the cases, but especially the case when we had to spill data to disk and there were too many partitions to keep the file handles open.
All of the compression codecs that Spark supports can be concatenated after the fact.
Also the default checksum used by Spark adler32
or NONE
depending on the version of Spark we are on have a way to calculate the checksum in a distributed way. (The checksum is used as a part of the message handed back to the driver with the shuffle data is committed and written out to the metadata file as well)
For adler32 I need to do a bit more research but CERN did it so we should be able to do it too. http://cds.cern.ch/record/2673802/files/ATL-DAQ-PROC-2019-002.pdf
The only problem shows up if encryption is enabled or if the checksum is CRC32. So we will need to support the algorithm I described originally for when encryption is enabled but we might be able to have a very fast common case for shuffle. If creating a compression output stream is cheap enough we should be able to compress each batch as we see it come in, in parallel in a thread pool along with a checksum for the data. We can decide if that should stay in memory or go out to disk directly, but I think keeping it in memory might be the best. Then if we need to spill we can write it out to disk and keep a pointer to where it is along with the checksum.
If the checksum is CRC32 we can still do the compression in parallel, but we would have to calculate the checksum serially on the resulting data.
When we need to write out the final data we can move the compressed data to the proper location in the final file and start combining checksums together, if needed.
The details of the output stream used are.
SerializationStream( // Converts the objects into bytes
CompressionOutputStream( // Optional compresses the data
ErrorHandlingOutputStream(CryptoOutputStream( // Optional encrypts the data
ManualCloseOutputStream( // Does a flush instead of a close and the close is separate
BufferedOutputStream(
ChecksumOutputStream( // Optional if empty none is set
TimeTrackinOutputStream( // used for metrics
FileOutputStream
)
)
)
)
))
)
)
A few things to keep in mind.
First if spark.file.transferTo
is enabled, and the there is no encryption and the compression codec can be concatenated, then spark will use NIO (file channels) to transfer the data to the final location. Apparently there are some bugs in old versions of Linux that make this not work. I don't think we support any version of linux with this bug in it (2.6.32 according to the error message), so we might be able to always to the faster transfer.
Second spark.shuffle.sync
is off by default, but if it is enabled then we need to make sure that the data hits the disk before we go on with shuffle (or we need to fall back to the old shuffle code)
The reason I am bringing this up is that in order to make this work we are likely going to have to copy over a lot more code from Spark at a lower level for shuffle. We also are going to have to be much more careful about auditing any shuffle changes while we are at it.
Is your feature request related to a problem? Please describe. The current Spark shuffle has two modes. For tasks with less than 200 partitions, configurable. It will write the shuffle data out to a temp file per partition, and then in a second pass it will merge the files together in sorted order. For larger numbers of shuffle partitions the data is sorted by the shuffle partition and written out in sorted order. This might involve spilling if the data is too large. The problem is that it is not a graceful transition between the two. If I run NDS scale factor 3k on dataproc with 256 shuffle partitions, not multi-threaded shuffle, I see a 4.7% total run time difference between the two approaches. Going from 200 to 201 partitions is likely to cause a large performance regression.
Also we cannot just configure the shuffle to always use the merge-bypass shuffle. It uses much more system resources when the number of partitions is high. For example the same NDS 3k runs show a 5.88% performance loss when using merge-bypass shuffle at 768 partitions. Going to 896 partitions caused NDS to fail.
Our current multi-threaded shuffle implementation only deals with the 200 partitions or smaller use case. It is really a multi-threaded merge-bypass shuffle. This is likely to compound the performance issues, and it is not clear to an end user that there is a lot more performance to be had if they need a few more shuffle partitions. It is also not clear to the end user that the multi-threaded shuffle is not used when the number of partitions gets too high. It would really be nice to have a way for the performance to transition gracefully, and to have an accelerated version of the sort based shuffle as well.
In addition to this for larger jobs and larger clusters we are likely to want to have more partitions.
It would really be even better if we could optimize for some common cases when a single "batch" of data is output by a task.
Describe the solution you'd like I am not 100% sure what we want to do. I have a few ideas, but we need to do some experiments to find out what might and what might not work.
When evaluation a shuffle implementation we need to think about a few factors.
So if we evaluate the various shuffles it can help to show what is happening here.
The buffer size is managed by Spark and can get rather complicated, but it is by default at most
(heap - 300 MiB) * 0.6
, but it is shared by all tasks.From this I have a few ideas that would be nice to play with and test out.
The sort in Spark is really designed around a lot of small rows. It is also designed to be generic for all types of RDDs where they key may not be the partition. But we typically have a few number of large-ish sub-batches. Also the number of partitions is going to be something we can reasonably handle in memory (not the data, but the partitions). We also have a clear separation between the partition key and the data. It would be interesting to see if we could take a hybrid approach here.
We keep the data in memory, but we "sort" it by keeping a
List<LinkedList<SubShuffleBatch>>
or something like that. The key to the top level list is the partition and then we can tack more sub-batches onto the each partition as we see them. This is to keep the data in the same order it was output by the task. That is not strictly a requirement but would be nice. When we run out of memory and need to spill, we can start by compressing different partitions in parallel and writing them out to disk. When we write them out to disk we can compress partitions in parallel (up to N Codecs and file handles where N is configurable, but probably 200 by default) and we would replace them with pointers to where they were written out. So for example if I had 400 shuffle partitions to spill and 200 codecs/files to use I would compress the even ones first and write them out to 200 files at the beginning of the files, then I would create new compression codecs and do the odd partitions writing them after the even ones. When all of the data is added we can do a final pass and decide how to build up the final output file.For any partition where all of the data for it is fully in memory we can compress them in parallel and write them out at the appropriate time to the final location in the file. This would hopefully help with the common case of a small shuffle by not having any read or write amplification.
For any partition where all of its data was written out to a file in a single chunk, we can move it just like we do today for the multi-threaded shuffle. We might not be able to send an entire file, but we can send a large chunk of it.
For any partition where the data is not all in a single location we are going to have to read in back in and compress it in order. We can do some compression in multiple threads, but we are going to have to pay attention to memory usage and buffer sizes. If we are okay with some write amplification we can write some number of them in parallel to different files and we try to get the final size and them move them to the final location.
We would have to do some profiling and testing to see what is best and especially on what kind of disks.