Bears-R-Us / arkouda

Arkouda (αρκούδα): Interactive Data Analytics at Supercomputing Scale :bear:
Other
240 stars 87 forks source link

Reduce sort overheads #1404

Open ronawho opened 2 years ago

ronawho commented 2 years ago

The core RadixSortLSD routine has some start up overheads that can be pretty dramatic, especially at scale. At 75K cores we have seen 20-25 second of start up time. For large sorts this is typically amortized, but this start up cost can be painful for small sorts and when doing strong scaling (running a fixed problem size that you want to get faster with more nodes.) In this issue, we'd like to explore some of the overheads and see what we can do to reduce them.

There are a couple of known start up costs for RadixSortLSD including algorithmic overhead, aggregator creation time, and dynamic memory registration costs (at least under gasnet-ibv). Going into this, I was assuming most of the overhead would be algorithmic, but that doesn't appear to be the whole story. The following table has results from some initial explorations with dynamic and static registration at varying number of tasks. This is with sorting 2**15 elements per node on a 32 node InfiniBand system with 128 cores per node.

Tasks Dynamic Static
1 0.26s 0.15s
128 2.99s 0.59s
2048 60.79s 6.81s

Run details (with subtimers from https://github.com/Bears-R-Us/arkouda/compare/master...ronawho:sort-subtimers-2):

gasnet-ibv-large (dynamic registration): ``` CHPL_FLAGS="--gasnet-segment=large -sperfOnlyCompile -sperfValRange='uint(32)'" make TEST_BINARY_DIR=test-bin-large test-bin-large/UnitTestSort ./test-bin-large/UnitTestSort -nl 32 --elemsPerLocale=$((2**15)) --RSLSD_numTasks=1 checkSortedT: 0.00 arrayCreateT: 0.01 countDigitsT: 0.02 globalCountsT: 0.17 calcPosAndPermuteT: 0.05 arrayCopyT: 0.00 ranksAssignT: 0.00 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(32) vals) -- 0.98 MiB/s per node (0.26s) ./test-bin-large/UnitTestSort -nl 32 --elemsPerLocale=$((2**15)) --RSLSD_numTasks=128 checkSortedT: 0.00 arrayCreateT: 0.01 countDigitsT: 0.28 globalCountsT: 0.16 calcPosAndPermuteT: 2.53 arrayCopyT: 0.00 ranksAssignT: 0.00 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(32) vals) -- 0.08 MiB/s per node (2.99s) ./test-bin-large/UnitTestSort -nl 32 --elemsPerLocale=$((2**15)) --RSLSD_numTasks=2048 checkSortedT: 0.00 arrayCreateT: 0.04 countDigitsT: 18.93 globalCountsT: 0.41 calcPosAndPermuteT: 41.41 arrayCopyT: 0.00 ranksAssignT: 0.00 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(32) vals) -- 0.00 MiB/s per node (60.79s) ``` gasnet-ibv-fast (static registration): ``` CHPL_FLAGS="--gasnet-segment=fast -sperfOnlyCompile -sperfValRange='uint(32)'" make TEST_BINARY_DIR=test-bin-fast test-bin-fast/UnitTestSort CHPL_FLAGS="--gasnet-segment=fast -sperfOnlyCompile -sperfValRange='uint(32)'" make TEST_BINARY_DIR=test-bin-fast test-bin-fast/UnitTestSort ./test-bin-fast/UnitTestSort -nl 32 --elemsPerLocale=$((2**15)) --RSLSD_numTasks=1 checkSortedT: 0.00 arrayCreateT: 0.01 countDigitsT: 0.01 globalCountsT: 0.10 calcPosAndPermuteT: 0.03 arrayCopyT: 0.00 ranksAssignT: 0.00 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(32) vals) -- 1.67 MiB/s per node (0.15s) ./test-bin-fast/UnitTestSort -nl 32 --elemsPerLocale=$((2**15)) --RSLSD_numTasks=128 checkSortedT: 0.00 arrayCreateT: 0.01 countDigitsT: 0.12 globalCountsT: 0.11 calcPosAndPermuteT: 0.34 arrayCopyT: 0.00 ranksAssignT: 0.00 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(32) vals) -- 0.43 MiB/s per node (0.59s) ./test-bin-fast/UnitTestSort -nl 32 --elemsPerLocale=$((2**15)) --RSLSD_numTasks=2048 checkSortedT: 0.00 arrayCreateT: 0.03 countDigitsT: 1.92 globalCountsT: 0.32 calcPosAndPermuteT: 4.54 arrayCopyT: 0.00 ranksAssignT: 0.00 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(32) vals) -- 0.04 MiB/s per node (6.81s) ```

Dynamic registration overhead:

Here we're seeing that static registration is always faster, and pretty substantially so at higher task counts. This means a non-trivial amount of the overhead is from dynamic registration. This is surprising to me because I think we're only doing communication out of the aggregators, which have pretty high reuse to exchange buckets so it's not obvious where the overhead is coming from yet. My best guess is that maybe we're doing some operation in lockstep so it's possible a serialization effect from dynamic registration is hitting harder.

Note that the main reason we use dynamic registration over static today is because of high start up costs, especially so on systems with high amounts of memory. As the chapel team looks to make running with a locale per socket a recommended way of running, this will reduce some of those start up costs and may make this a viable alternative. In the meantime, I want to see if we can reduce the current overheads (either through algorithmic changes to the sort that reduce contention or to changes to gasnet if we find the bottleneck is there.) See also https://chapel-lang.org/releaseNotes/1.24/03-perf.pdf, which has some background on static vs dynamic registration (and why we currently default to dynamic)

Algorithmic overhead:

Lowering the number of tasks has a pretty substantial performance benefit so I think we'll want to come up with a heuristic to choose how many tasks to use. I'd probably start with something conservative and tune more over time. In general, I think it's hard to pick the perfect thresholds, so I'd probably start with something like requiring each task to have at least enough elements to fill a couple aggregation buffers and then tune over time as we get more experience. I think this is fairly easy to tune for small problem sizes, but it's less clear how to tune for strong scaling. i.e. if you want something to go faster by using more nodes, it seems strange to reduce the number of cores but that could still have a net benefit because while you might still have the same number of cores injecting you'll have more nodes doing so, which can increase total bisection bandwidth.

The other aspect to consider here is that we can reduce the number of buckets by decreasing the digit width from 16 bits to 8. This will add more rounds to the radix sort, but there's a tradeoff here between startup costs and number of iterations/rounds required. In my early experiments this seemed to always hurt performance, but that's counterintuitive to me, so I need more time to investigate.

Aggregator overhead:

I need to do more investigation here to get a sense of overheads, but I wanted to capture some misc thoughts if this ends up having more overhead than I think.

For really small arrays, some of the aggregators will only have a handful of elements, so it may not be worth doing remote allocations and may instead be faster to just do unordered copies. That said, I think the threshold for doing unordered copies is going to be very very small on IB and it's probably only be worthwhile on systems with decent concurrent unordered copy perf (just Aries/SS-11 today).

In terms of the allocations and creation overhead -- On first flush of the DstAggregator we'll do an on-stmt to allocate a remote buffer (which is then cached), then PUT there, then another on-stmt to flush the buffer. This is good in the steady state since we have short-lived remote tasks, but I wonder on first flush if we should change that do an on-stmt to allocate, GET the data, and flush. This would eliminate an on-stmt, which may be beneficial for aggregators that aren't used heavily. For SrcAggregator there is more overhead. We do 2 on-stmts for remote allocations, then a PUT, then on-stmt, then a GET. And on teardown we have an extra on-stmt to free a buffer (we free one of the buffers inline on the last flush, but the other is still in use then). I think we could combine those 2 on-stmts to allocate and we can probably inline the free if we change the last flush to PUT data instead of GETting after the on-stmt.

It's also possible that smaller buffers may be more beneficial for less heavily used aggregators. Today we also create an aggregator per-task, which is great for avoiding contention, but if the creation (and in particular remote allocations) get expensive we can do multi-hop aggregators where each tasks buffers into a larger node-local buffer and all between-node comm is done from that buffer. This will add contention, but may still be a win in this case.


Overall I should note that aggregators and sorting in general has been tuned for large sorts, so there is almost certainly a fair amount we can do to improve aggregating / sorting small amounts of data.

mhmerrill commented 2 years ago

@ronawho @pierce314159 I wanted to get the class wrapper for the RadixSortLSD so we could easily have a factory which could make sorts with different configurations so we could more easily optimize some of the things you are talking about here. #1338

ronawho commented 2 years ago

Given that the Chapel 1.27 release is coming out soon, I was focusing on looking for any changes we could make to Chapel to improve this situation (and then I'll switch more to looking at changes to Arkouda.)

To that end, https://github.com/chapel-lang/chapel/pull/19968 improved the scalability of scans on block distributed arrays, which should help. This should take the globalCountsT time in the original post from ~0.17s to ~0.01s now at 32 nodes. This also had some non-trivial performance benefits on other benchmarks

This will help a little with the performance of small sorts (though there are other larger overheads left) and should help with strong scaling, particularly at large scale (scans had a surprisingly large amount of communication with poor scaling behavior before)

ronawho commented 2 years ago

Aggregator overhead:

I don't have any concrete PRs to open yet, but I'm heading on vacation for a few weeks and wanted to capture some notes on my current investigations before I head out and forget details.

I've been pursuing a couple of different avenues here. First it's worth noting that aggregator creation seems to be more expensive than I was expecting and for small sorts we have a lot of single-use aggregators (aggregators where we only fill and flush the buffer once.) The bucket exchanges involve each task exchanging 2**16 elements total and 2**16 / numLocales with each locale. This means for InfiniBand systems each bucket exchange aggregator is single use when running on 8+ locales.

The main ways I can think of to improve this are:

Optimize single-use aggregators

For a DstAggregator a single-use aggregator involves an on-stmt to allocate a remote buffer, a blocking PUT to move local data to the remote buffer, and an on-stmt to flush the data and free the buffer. So we have 3 round trip network operations and 2 on-stmts that we have to wait to get scheduled.

On-stmts have a payload capacity and for small enough aggregator buffers we can store the buffer inline in the on-stmt payload. The amount we can send is implementation specific, but it's quite large at ~64KB for gasnet-ibv. For other layers like ugni it's only 1KB though. Still even if the buffer can't fit in the AM payload, we'll just do a blocking GET on the target side. This still eliminates 1 on-stmt that was allocating a remote buffer before.

I have two prototypes for this:

https://github.com/chapel-lang/chapel/compare/main...ronawho:chapel:opt-small-agg uses existing language functionality to copy the elements into a fixed size c_array and send them along with the active message. This has the benefit of working today, but the downside of being a fixed payload instead of just sending the bytes that are needed. We could improve this by creating a few size classes, but this will likely have some compilation time impact.

https://github.com/chapel-lang/chapel/compare/main...ronawho:chapel:pack-agg-into-am uses new runtime functionality to directly send the aggregation buffer (copying the normal active message header before the aggregation buffer.) This has the advantage of only sending the data we need and eliminating some copies, but does require new runtime support. It also requires either new language support or at least a new runtime supported data structure to work.

Both branches perform similarly today at least on InfiniBand with message sizes approaching AM payload size, and this is roughly 4x faster than the current scheme for single-use aggregators. Longer term I think the runtime assisted version has the best potential because we can negotiate with what the runtime and communication layer are capable of.

You could imagine implementing copy aggregators entirely in the runtime and that would give us a lot of flexibility at the expense of a lot of implementation work (a fair amount of work, and in C instead of Chapel, and across all our comm layers.) The other downside to this is a point solution for copy aggregators, but I think we want a general solution to be able to reuse for other aggregators down the road. Probably the ideal here is a runtime assisted "onStmtBundle" or something that allows local buffering, and then an optimized data transfer+AM.

Note that all my experiments so far have been with DstAggregator. SrcAggegator is harder because we have to "return" the gathered values from the target node. Today we do this with a GET from the initiator after an on-stmt. We could instead do that with a PUT from the target to optimize memory allocations at the expense of longer lived tasks. In an ideal world i think we'd "return" this along with the active message. An active messages is conceptually 2 pieces. A request and a reply. Above I'm talking about sending a buffer along with the request, but I think we could also arrange to send a buffer along with the reply. That's not something we have piped through today, but is possible. This is functionality that's already built into gasnet, but we would have to implement in ofi/ugni.

Have single-use aggregators use unorderedCopy

We could consider flushing with unorderedCopy instead of doing an on-stmt. Whether this makes sense is also network dependent. Small concurrent comms are fast under ugni, but the threshold for when this would win under gasnet-ibv is very low (like 16 or fewer elements per task or something.) I think this has potential in the future if we can optimize concurrent fine grain comm for more networks, but don't think this helps today at least on InfiniBand systems.

Reuse instead of recreating aggregators

Currently, aggregators are created and destroyed for each pass through radix sort. We could store the aggregators in a distributed array and re-use them across across iterations and amortize the creation cost. You could take this further and create global caches of these, but I think that gets harder to coordinate and the caches could get quite large.

Switch to multi-hop aggregators

Currently, each task has it's own aggregator that allows us to communicate with remote nodes without any synchronization. This is nice from the perspective of avoiding contention but when the number of elements being exchanged is small it means we have smaller buckets than ideal and more overhead to create aggregators. We could instead multi-level aggregation where each task would buffer locally and then we'd have a single aggregator per node that would communicate with remote nodes.

This introduces synchronization and the potential for long stalls while a task waits to merge it's buffer into the node-level buffer, but I think has potential if implemented well. https://github.com/chapel-lang/chapel/compare/main...ronawho:multi-hop-agg explores this.

Avoid aggregation

At least for the bucket exchange, we should be able to buffer all the buckets locally and do a bulk RDMA exchange. The problem with this is our local view isn't contiguous with the global bucket view, so we'd really be doing something like buffer everything locally, bulk exchange, and then after exchanging we'd have to reorder. I think this has promise though.


I don't think these efforts are necessarily mutually exclusive. We could get rid of aggregation for bucket exchange and still work on other optimizations for the aggregator used for the element exchange.

This isn't directly related to any of the points above, but I was also exploring using a gasnet "long" active message, which basically combines a PUT with an AM. This reduces the amount of communication needed. I wasn't seeing much benefit for the single-use buffers, but did see a large benefit in the steady state for larger sorts. This work is captured in https://github.com/chapel-lang/chapel/compare/main...ronawho:chapel:am-long-3.

Other misc notes are that we final flushing of aggregators and all the bucket exchanges in lockstep. By that I mean all aggregators will flush to locale 0 first, then locale 1, and so on. We should offset these to spread out the network load. In the steady state this doesn't matter much for large aggregations because things naturally become offset but for single-use aggregators where everybody finishes around the same time all the flushing happens sequentially to each node, which adds a lot of overhead.


My rough plan when getting back is to focus on improvements I think we can finish quickly. Where I want to see if we can avoid aggregation for sort bucket exchanges first. If that doesn't go well I'll switch to re-used multi-hop aggregators for the bucket exchange. Then I want to tune the number of tasks used for radix sort.

That should improve small sorts and help strong scaling somewhat, but I think there's a lot more we can do in the slightly longer term to really help strong scaling and performance for larger sorts.

ronawho commented 2 years ago

I had an idea in the background while my brain was off for vacation and had a couple minutes today to try it out. Instead of trying to do general multi-hop aggregators, we can instead do something a little more manual to combine the task buckets across a locale. This results in fewer and fuller aggregation buffers and avoids doing comm in lockstep. PR for that is up at https://github.com/Bears-R-Us/arkouda/pull/1635.

This should have some pretty significant improvements on sort scalability, which should improve small sorts at modest scale and improve strong scaling.

There's still more we can do including some of the steps outlined in the previous comment, so I want to keep this open, but #1635 should get rid of some of the worst overheads.

bradcray commented 2 years ago

I had an idea in the background while my brain was off for vacation

"Make sure Elliot gets more vacations": message received.

ronawho commented 1 year ago

There have been some pretty large improvements to sort scalability with upstream scan optimizations, improvements to sort bucket exchanging, and offsetting aggregation flushes:

I had brief access to an older 240 node SGI InfiniBand system to collect some scaling results and here's the scaling trend I see up to 240 nodes for trivial and modest problem sizes (512 KiB and 512 MiB per node):

ak-argsort-time-2-to-16

ak-argsort-time-2-to-26

Generally speaking, performance and scaling are quite a bit better now and ~10x faster at 240 nodes (~10K cores) for the 512 KiB problem size.

ronawho commented 1 year ago

There are some additional improvements we can make. Based on a suggestion from Brad, the main change that I'm currently thinking about is reducing the number of buckets that have to be exchanged. Today, the buckets/counts for all tasks get exchanged, but I think we can change this to compute the counts for each node and just exchange those counts. This does require storing the original task local counts so we can compute the task local starting positions from the node starting positions, but that's not too bad and it reduces the amount of buckets to exchange by numTasks.

This is prototyped in https://github.com/ronawho/arkouda/compare/b99799f03ba527bfc292f839c8d37022a3f12e89...ronawho:arkouda:add-sort-subtimers-smaller-exchange-3?w=1

I don't have access to the SGI machine anymore, so I'm back to smaller scale (but higher core count) machines. Here's some preliminary timings for a 32 node system with 128 cores per node for an almost empty array of 64-bit uints:

CHPL_FLAGS="-sRadixSortLSD.enableTimers --no-warnings -sperfOnlyCompile -sperfValRange='uint(64)' -stestKeys=false" make test-bin/UnitTestSort

./test-bin-clean/UnitTestSort -nl 32 --elemsPerLocale=$((2**1))
checkSorted    : 0.00s
arrayCreate    : 0.01s
computeLCounts : 0.03s
scatterLCounts : 0.10s
scanGCounts    : 0.02s
gatherLCounts  : 0.19s
putSorted      : 0.00s
swapArray      : 0.00s
rankAssign     : 0.00s
 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(64) vals) -- 0.00 MiB/s per node (0.37s)

./test-bin-opt/UnitTestSort -nl 32 --elemsPerLocale=$((2**1))
checkSorted     : 0.00s
arrayCreate     : 0.01s
computeLCounts  : 0.00s
scatterLCountsA : 0.01s
scatterLCounts  : 0.01s
scanGCounts     : 0.01s
arrayLCounts    : 0.00s
gatherLCounts   : 0.01s
adjustLCounts   : 0.01s
putSorted       : 0.00s
swapArray       : 0.00s
rankAssign      : 0.00s
 radixSortLSD_ranks(A:[] int(64)) -- (rand uint(64) vals) -- 0.00 MiB/s per node (0.07s)

It'll be a few weeks until I have time to clean up and PR this code, but wanted to capture the current thinking. Once that work is in we're back to a state where we're exchanging a pretty small amount of data and we'll have single-use aggregators so it may be worth revisiting some of the single-use aggregator optimizations. It'd also be nice to evaluate this work at higher scale, but it'll be a while until I have access to any larger machines again.