We have discovered some places where the IO buffer is not efficiently used in the unsafe code path.
There are also some places in which we could reduce the number of memory copies. Here are the details:
In SplashUnsafeSorter.writeSortedFile, we used a writerBuffer to hold the serialized data. And then write the content in this buffer to the output stream. To avoid the second copy during the write, we create our own SplashBufferedOutputStream which exposes the internal buffer so that it could be used by the serializer to fill the serialized data directly. By doing this, we could also save the memory used by the original writerBuffer. It could also improve the testability of the buffer mechanism. Unit tests are added for SplashBufferedOutputStream to make sure we manage the buffer correctly.
Replace IOUtils.copy with SplashUtils.copy. This function borrows most of the code from IOUtils.copy. The only difference is that it allows the user to specify the size of the buffer. In previous tests, we identified some 4K IO requests. Those IO requests are issued by IOUtils.copy. Because this function uses a fixed 4K IO buffer. This is not efficient nor elastic in a shared file system or distributed file system. This buffer now shares the same Spark configuration spark.shuffle.file.buffer. What's more, since we already have this IO buffer. We could use InputStream and OutputStream directly instead of the buffered version. This helps us to save more memory. Since the copy procedure is executed in the same thread, we could safely reuse the same buffer during the copy. It helps us reduce the GC time.
We have discovered some places where the IO buffer is not efficiently used in the
unsafe
code path. There are also some places in which we could reduce the number of memory copies. Here are the details:SplashUnsafeSorter.writeSortedFile
, we used awriterBuffer
to hold the serialized data. And then write the content in this buffer to the output stream. To avoid the second copy during the write, we create our ownSplashBufferedOutputStream
which exposes the internal buffer so that it could be used by the serializer to fill the serialized data directly. By doing this, we could also save the memory used by the originalwriterBuffer
. It could also improve the testability of the buffer mechanism. Unit tests are added forSplashBufferedOutputStream
to make sure we manage the buffer correctly.IOUtils.copy
withSplashUtils.copy
. This function borrows most of the code fromIOUtils.copy
. The only difference is that it allows the user to specify the size of the buffer. In previous tests, we identified some 4K IO requests. Those IO requests are issued byIOUtils.copy
. Because this function uses a fixed 4K IO buffer. This is not efficient nor elastic in a shared file system or distributed file system. This buffer now shares the same Spark configurationspark.shuffle.file.buffer
. What's more, since we already have this IO buffer. We could useInputStream
andOutputStream
directly instead of the buffered version. This helps us to save more memory. Since the copy procedure is executed in the same thread, we could safely reuse the same buffer during the copy. It helps us reduce the GC time.