Open EnricoMi opened 7 months ago
One concern is that it may occupy too much memory if we use Map to store blockIds instead of bitmap.
The shuffle servers do not need to store all individual shuffle result block ids, but only the number of blocks per partition and map id. An array of integers would be more compact than a Map. A shuffle from 1000 to 1000 partitions would occupy less than 4 MB memory.
Code of Conduct
Search before asking
Describe the proposal
The block id encodes the task attempt id, the partition id, and the sequence number of a shuffle block. Block data are stored along with this block id, while some of these information are explicitly stored as well (task attempt id). For reading, interfaces use block id, partition id encoded in block id, and explicit task attempt ids to identify relevant blocks. Reading blocks explicitly by their block ids implicitly reads particular partition id, task attempt id and sequence number tuples.
Further, the block id is 63 bits long, while task attempt id and partition id are 31 bits long. This poses a conflict as bits have to be balanced between task attempt ids, partition ids and sequence number. There are potential situations where one of these bits are insufficient, which kills an application using the shuffle service.
The block id should be refactored to simplify interfaces, to make reading blocks explicit, and to avoid potential situations where bits are insufficient.
The block id is effectively a triple of (task attempt id, partition id, sequence number), truncated and encoded in a
long
.Given each block id is stored together with its task attempt id, we can make block id a triple of (task index, partition id, sequence number). Blocks a read giving the set of task attempt ids that wrote the desired map index, we can reduce the bit foot print of the block id.
The structure of the block id should then be made explicit:
The block id is often used in large sets, e.g.
A task attempt can register blocks using a
Map<Integer, Integer>
, which maps a partition id to the number of blocks (#1399). This should also reduce the memory footprint storing registered block ids on the shuffle server (or the Spark driver #1538).Reading blocks should be as easy as reading blocks written by a set of task attempt ids that produced block ids that match
Map<Integer, Map<Integer, Integer>
, which maps a partition id to the task index and their number of blocks. Given map index and partition index are ranges, we can store these information efficiently as arraysblockNums[partitionRangeIndex][mapRangeIndex]
.While reading those blocks, the reader has to maintain a set of block sequence numbers from
0
toblockNum - 1
(#1399) for each partition id and task index to identify duplicates. This can be done viaMap<Integer, Roaring64NavigableMap>
, where theRoaring64NavigableMap
stores read (map index, sequence number) tuples.The
BufferSegment
stores the block id (long
) and already the task attempt id (long
). Reworking the block id will increases the size ofBufferSegment
only by one 32 bits (block id long turns into three ints).Primary objective:
Benefits:
BufferSegment
Task list
Map<partitionId, blockNums>
taskAttemptId
withmapIndex
in block idBlockId
instead oflong
everywhereBlockId(long blockId)
intoBlockId(int mapIndex, int partitionId, int sequenceNo)
Are you willing to submit PR?