libMesh / libmesh

libMesh github repository
http://libmesh.github.io
GNU Lesser General Public License v2.1
648 stars 285 forks source link

Add non-blocking set, etc. receives #1684

Open roystgnr opened 6 years ago

roystgnr commented 6 years ago

I'd thought that "how do you figure out the proper size for a temporary buffer" was an impossible issue to solve, but one solution is simple: "the same way you figure out the proper size for a non-blocking vector receive: make the user do it".

Granted, making the user preallocate with std::vector::resize() leads to code that doesn't look as dumb as making the user initialize std::set with a counting_iterator or making them fill it with a loop, but either way the library would be happy.

On the other hand, in the set case (and in the map case, w.r.t. keys) unlike in the vector case, any preallocation is wasted unless it happens to preallocate an element which will be received, because otherwise it'll just have to be deleted. The waste here is on the order of having to use a temporary buffer in the first place, but if we ever really want to do a non-blocking set or map receive maybe we should just bite the bullet and make them use a non-standard argument list with an extra size_t to specify incoming size.

friedmud commented 6 years ago

No. @permcody and I were just discussing that nothing in parallel.h should ever assume the user has pre-sized the vector. The few cases where that is true are infuriating and should be changed.

Further, for true non-blocking with unknown size messages this is nearly impossible.

Even further: for complex types you can't predict the size because the size is dependent on the way the packing is done. For instance, the new std::vector<std::vector<T>> packs into a std::vector<char> and encodes the sizes of each vector within there. How is a user going to know how to size the receive buffer there?

No: the right way to do this is already built into the MPI library itself: Statuses. The stat tells you the size. It works. Every time.

There is nothing wrong with stat based code - it doesn't look dumb.

I propose that nonblocking receives should internally do an iprobe() to get a status, then resize the temporary buffer. If there is no message they will return false - returning true if there was a message.

So for receive() the signature would look like:

bool receive (const unsigned int dest_processor_id,
                T & buf,
                Request & req,
                const MessageTag & tag=any_tag) const;

Internally it would do a non-blocking probe, get the status, resize the buffer, attach post-wait work and return true (if there is a message) or return false if there's no message.

To use this you would do something like (with a few details left out):

while(true)
{
  if (receive (any_source, buff_to_fill, req, tag))
    current_receives.push_back(std::make_pair(buff_to_fill, req));

  for (auto & rec : current_receives)
  {
    if (rec.first.test())
    {
      rec.first.wait(); // Required to to post-wait work
      // deal with the data within rec.second (the filled in buffer
    }
  }
}

This basic structure is fully asynchronous and allows overlapping of dealing with the buffers and pulling them into memory. It will work with any type of object and any size of buffer without needing to know anything about the sizes of them beforehand.

roystgnr commented 6 years ago

How does this algorithm (which doesn't actually post a receive until the probe succeeds) square with what you told me the other night,

The nonbocking receive can pull data off the line “in the background” while you’re doing other work (like dealing with other receives) - so you can overlap communication and computation.

As far as MPI is concerned, we haven't actually done a non-blocking receive until MPI_Irecv is called. If the only time we call MPI_Irecv is after a probe succeeds, with an eager protocol message there's nothing left for the MPI implementation to do except memcpy, that's not overlapping communication and computation, that's just scrambling computation and computation.

With a rendezvous protocol message there may be a communication delay between when you post the receive and when the receive is actually done, so that's a plus.

But in the eager case and especially in the rendezvous case, what you'd really like to do is post the receive before the send! With an eager protocol message, MPI can then put data straight into the buffer you've provided, rather than allocating a temporary buffer and then putting data into it and then copying out of it again and then deallocating it. With a rendezvous protocol message, MPI then can immediately respond to the sender and tell it to begin sending the rest of the data right away.

I like your API too, and I'll add it. Presizing buffers is easy in literally 19 times out of 20 in the cases I've been looking at, but that remaining 5% of cases range from "slightly less efficient" to "huge pain in the ass", and we should have other alternatives for it.

But I'm not getting rid of existing APIs that are better for the other 19 cases. At best you could talk me into trying harder to the most common misuse case: we could assert that such buffers aren't zero-sized, since "trying to put a non-zero-length message into a zero-length buffer" is always a bug and "trying to communicate a zero-length message" is usually a bug.

friedmud commented 6 years ago

Meh - that's an overly alarmist article about "evil" unexpected messages. All it says is that RAM is used for eager receives... but so what? RAM is going to be used whether you posted the receive first or not. I've sent millions/billions of unexpected messages in my current research code (where there's actually no way to know about them beforehand) without any adverse effects that I can see.

"Eager" messages aren't really worth talking about in my opinion... they're small (bytes to a couple k-bytes) anyway so MPI can handle them however it best can. They'll be efficiently buffered until dealt with. Sure - if there's a receive posted the data can flow right into that buffer - but we're talking about tiny messages anyway so buffer and copy isn't going to be a big deal. Not too mention that on clusters that buffering might be done down on the network card hardware anyway - and require a copy no matter what.

Rendezvous is where things get interesting. The only thing buffered is the message envelope - which is what a probe is looking for. Once you spot an envelope you then have all the information you need to receive the message: where it's coming from and what size it is. So you can presize the receive buffer (based on the probe) and kick off the rest of the rendezvous algorithm (handshake, transfer, etc.) with an iReceive. Then you can go off and do other work (including looking for more messages, starting more iReceives and dealing with messages that have been completely received) while you wait for that buffer to fill up.

Note also that even if you post iReceives up front... there is no guarantee that MPI will get any CPU time to service them (to start a transfer, etc.) while you're doing other productive work. Depending on your MPI implementation it might just wait until you come back around and wait before it gets any time to start up a receive. This is very implementation and hardware dependent though.

Now: let's deal with the heart of the problem with "your" algorithms (i.e. "knowing the message size ahead of time"). For unstructured data where you can't algorithmically know/compute the message sizes independently (such as sparsity data - or ghost neighbor data - etc.) you have to somehow communicate things to other processors:

  1. That there will be a message at all
  2. The number of messages
  3. The size of the messages

So far, you're using alltoall() to do that: but here's the deal... that doesn't scale. While vendors have (of course) come up with the smartest algorithms they can for alltoall()... there's no way around the fact that you are essentially doing a p^2 (num_procs^2) amount of data movement. For basic topologies it really is p^2... for fancier topologies (like a hypercube) it can go down to p logp.

As you get to larger numbers of processors, with sparser communication patterns... that alltoall() time will creep up on you. alltoall() can fairly effectively flood a network.

Doing an alltoall with extremely sparse communication (like the case for ghost neighbor communication with lots of processors... you're only going to end up with ~10 neighboring procs in 3D) is a HUGE waste of time. All just to make sure that a receive is ready and waiting on the other end once you finally finish the alltoall and get around to sending the data?

Nope: for sparse communication, it is MUCH better (much more scalable) to use isend/iprobe/irecv.

Think about running a problem on 10k procs. With the alltoall you are going to (at best) do ~100k (and more likely ~1M on a normal cluster) sends/receives just to find out who is going to be sending/receiving and how big the messages are. Then you're going to do the 100k (assuming ~10 neighbors) actual sends / receives.

Instead of doing that - just do the 100k sends as "unexpected" point-to-point communication with isend/iprobe/irecv.

With alltoall, at best you're doubling the time it's going to take and it may even be ~10x slower... instead of adding a (possibly nonexistent depending on what you're doing) latency by doing isend/iprobe/irecv.

Anyway - it's 4AM... so I'm going to go to bed now 😄 If I have time tomorrow I'll work up a quick example code that we can use to do some scaling studies...

friedmud commented 6 years ago

Ok - spent some time today putting together a code to test all of these theories (why argue when we can just test?)

You can take a look at the code here (look in src/main.C): https://github.com/friedmud/sparse_send_receive

I implemented 5 different communication patterns:

So... here are the results of a scaling study with 10 "neighbors" sending messages of 100 doubles and running 300 iterations of each algorithm (you can run this case by compiling the code and running mpiexec -n 8 ./sparse-opt 10 100 300:

screen shot 2018-05-19 at 9 00 39 pm

As you can see - full_async is orders of magnitude faster by the time you get to 1k procs... and it's dominance continues at 2k. I expect the gap to continue to grow to 5k procs and beyond. The AllToAll is simply not scalable.

Now - the overhead of full_async does make it a bit slower when the processor count is low... but who cares? At 8 procs for instance it's taking 10 milliseconds to do 300 iterations with full_async. It doesn't matter that it's slower than the other algorithms because none of them take any time when the number of procs is small.

I am also just struck by how slow these algorithms get at scale. Each proc is only exchanging with 10 neighbors! But it takes a huge amount of time. As we continue to try to go bigger we're really going to have to think about this more!

I also did a couple more studies to see how message size effects the timing:

screen shot 2018-05-19 at 9 20 58 pm screen shot 2018-05-19 at 9 19 09 pm

Again: you can see that full_async can be ~20x faster or more... but if the message size starts to get huge then the time to communicate the message data itself starts to dominate which brings all of the algorithms back into alignment.

You can see the full raw data for these graphs here: https://docs.google.com/spreadsheets/d/1xSRCJSwyvHtybQ4a-V4Z7eEuNdeqPqBTdQOnaBO5Le4/edit?usp=sharing

roystgnr commented 6 years ago

That's ridiculously clever.

Your claim seemed like a paradox to me - how can you be faster than an alltoall, when the algorithm inherently is an alltoall of message counts? At the end of it every processor knows the number of messages sent to it by every other, after all. And honestly, I still don't get it. Doing the "alltoall" asynchronously lets it overlap the rest of the communication, but that should make things maybe 2x faster, not 10x faster. You're effectively doing an alltoall(bools) rather than alltoall(ints), but I don't see how that buys you another factor of 5.

friedmud commented 6 years ago

Haha - thanks. This is what my dissertation is about - so it should be clever!

There are two things giving you speed here:

  1. The reductions are non-blocking. This means that you get to start sending / receiving immediately... and the reductions can just slip in there whenever they can.

  2. A reduction is not the same amount of communication as alltoall. AllToAll has to get unique data from every processor to every other processor. Reductions only need to get the agregate everywhere.

As stated before the basic complexity for alltoall is p^2 and the very best you can do is p logp... for allreduce it's only logp! That's a huge difference!

Check out some good performance analysis of these algorithms here: https://www8.cs.umu.se/kurser/5DV050/VT11/F2.pdf

(I love that PDF - really breaks things down well!)

BTW - I figured out why that stupid Barrier is needed here. It's not actually required by the algorithm (and isn't in my real production code)... it's because in this benchmarking scenario one processor can go ahead and start doing the next iteration while others are still working on this iteration... and messages end up getting crossed up.

If you look at the newest version of the algorithm I've made a few improvements that make it more similar to what I really do in production (although - that has MANY other things in it including "smart" sending queues with "progressive buffering" etc.).