openucx / sparkucx

A high-performance, scalable and efficient ShuffleManager plugin for Apache Spark, utilizing UCX communication layer
https://www.sparkucx.org/
BSD 3-Clause "New" or "Revised" License
45 stars 31 forks source link

Public Shuffle Transport API. #23

Closed petro-rudenko closed 4 years ago

petro-rudenko commented 4 years ago

Public Spark Shuffle Transport API. @abellina pls review

abellina commented 4 years ago

One thing I am having an issue is with metadata blocks. At write time, I do not know what slice the reader is going to want, and so I could either be sending the reader extra metadata for reduceIds it doesn't care about, or for mapIds it doesn't care about, or the other option is to receive N metadata blocks, where the N (number of metadata buffer) is proportional to the number of blocks needed. In the rapids plugin we have request/response semantics, so I could ask for N blocks, and receive a single metadata response that has all the info for those blocks (1 metadata buffer, instead of N, 1 receive, instead of N).

I don't have a good solution yet for this one. If you have ideas...

petro-rudenko commented 4 years ago

So that's why we want 2 methods: fetchBlocksByBlockId where you don't know the block size. And fetchBlocksByCookies where you know exact blockSize. For the first one, we may change parameter from memory: UcxMemoryBlock to allocator: (Int) => UcpMemoryBlock

abellina commented 4 years ago

The size of the block is not the problem, is the fact that I can't perform a request. Some readers (like the skew readers) want a custom slice of map output, and so they are expressing an intent to the server to only include certain blocks in the response. This means that metadata blocks can be batched together in weird ways, and it seems that our current implementation in the rapids plugin can handle this better since we can ask for any set of blocks in one go, and get one buffer with bytes contiguously written that describes all metadata about those blocks.

For now, I am going to either request N block metas, or try to group them by mapper or reducer id. It means we have more messaging, or larger size of metadata.

abellina commented 4 years ago

https://github.com/openucx/sparkucx/pull/23/commits/e86f198d3aa3d91de3690592ff150c2e17c7ff80

Curious on why we needed to remove doneWithMemory?

petro-rudenko commented 4 years ago

As discussed with @yosefe no need to have doneWithMemory method, since there's a mutate public method if you want to change memory, that has a callback when it's safe to remove old block.