open-mpi / ompi

Open MPI main development repository
https://www.open-mpi.org
Other
2.14k stars 859 forks source link

Point to point protocols sometimes take a long time to complete #10474

Open frizwi opened 2 years ago

frizwi commented 2 years ago

Thank you for taking the time to submit an issue!

Background information

What version of Open MPI are you using? (e.g., v3.0.5, v4.0.2, git branch name and hash, etc.)

OpenMPI v4.1.2

Describe how Open MPI was installed (e.g., from a source/distribution tarball, from a git clone, from an operating system distribution package, etc.)

Compiled from release source - with UCX

If you are building/installing from a git clone, please copy-n-paste the output from git submodule status.

Please describe the system on which you are running


Details of the problem

Please describe, in detail, the problem that you are having, including the behavior you expect to see, the actual behavior that you are seeing, steps to reproduce the problem, etc. It is most helpful if you can attach a small program that a developer can use to reproduce your problem.

Problem I'm trying to solve: Halo exchange of ocean model running on unstructured mesh Solution 1: Was using RMA, each partition process would:

MPI_Fence(..)
MPI_Get()
MPI_Fence()

Got terrible performance in scaling, basically as the number of partitions increased, the number of total exchanges did not decrease much and the number of exchanges between partitions got smaller. Tried all sorts of sync'ing methods, eventually advice from the HPC folks was, do not use RMA

Solution 2: Switched to using MPI_Sendrecv() - performance increased 10 fold! i.e. total time for comms

BUT, this was in my test program. While the comms time in my full application did improve "most of the time", every now and then the Sendrecv() would take many secs to complete. So timing would look something like this:

     fill_3d_w2w    :           3.64980
     fill_3d_w2w    :           1.35756
     fill_3d_w2w    :           0.01945
     fill_3d_w2w    :           0.01938
     fill_3d_w2w    :           0.01928
     fill_3d_w2w    :           0.01969
     fill_3d_w2w    :           9.61830
     fill_3d_w2w    :           0.01991
     fill_3d_w2w    :           0.01956
     fill_3d_w2w    :           0.01946
     fill_3d_w2w    :           0.01933
     fill_3d_w2w    :           0.01984
     fill_3d_w2w    :           0.01907
     fill_3d_w2w    :           0.01945
     fill_3d_w2w    :           0.01974
     fill_3d_w2w    :           0.01916
     fill_3d_w2w    :           0.38533
     fill_3d_w2w    :           7.96889
     fill_3d_w2w    :           0.01937
     fill_3d_w2w    :           0.01916
     fill_3d_w2w    :           0.01936
     fill_3d_w2w    :           0.01932
     fill_3d_w2w    :           0.01008
     fill_3d_w2w    :           0.01040
     fill_3d_w2w    :           0.01947
     fill_3d_w2w    :           0.02222
     fill_3d_w2w    :           0.01956
     fill_3d_w2w    :           0.01943
     fill_3d_w2w    :           0.01919
     fill_3d_w2w    :           0.01946
     fill_3d_w2w    :           0.00461
     fill_3d_w2w    :           11.92141
     fill_3d_w2w    :           0.02010

There is an MPI_Barrier just before the comms, and I'm wrapping gettimeofday to get the timing. Have tried running, ISends, with Recv's and even created a Dist_graph and using MPI_Neighbor_alltoallw - all with similar results.

While MPI_Get was slower, it was always consistent/stable. i.e. timing for each iteration was very similar. Of course, load imbalance can always be an issue but here I'm just doing a tic/toc around the comms, with a Barrier just before.

Note also, this is all on the same node.

Any ideas?

frizwi commented 2 years ago

Just to clarify, the timings above are per iteration of the same packet size

BenWibking commented 2 years ago

I regularly run on this cluster. I suspect it may be OS noise, rather than something related to OpenMPI itself.

For instance, running the "fixed work quantum" benchmark from http://www.unixer.de/research/netgauge/osnoise on 48 cores of a single node, I get: fixed_work_quantum

Note that this benchmark tests CPU loop timing variations, not MPI communication variations, so it is a measure of OS (thread scheduling) noise. The nodes on this cluster appear to be very noisy.

BenWibking commented 2 years ago

On this system, it looks like there are timing variations of ~100s microseconds on a loop iteration:

$ mpirun -np 48 ./netgauge -x noise
# Info:   (0): Netgauge v2.4.6 MPI enabled (P=48, threadlevel=0) (./netgauge -x noise )
# initializing x86-64 timer (takes some seconds)
# Info:   (0): writing data to ng.out
# Info:   (0): performing Selfish benchmark
# min clock cycles per rank: 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 60 20 20 20 20
Minimal cycle length [ns]: 6.264422
Number of iterations (recorded+unrecorded): 37498550245
Threshold: [% minimal cycle length]: 900
[0] CPU overhead due to noise: 0.15%
[0] Measurement period: 252.40 s

detour_noise

I will defer to the developers as to whether this could explain the timing variations you're seeing.

jsquyres commented 2 years ago

FYI @open-mpi/ucx

bosilca commented 2 years ago

One-sided communications are not a replacement for two-sided communications. One-sided communications have well defined reasons to exists, and usually they can only perform well in these conditions. Going back to the example above Fence + Get + Fence has basically no chance to performing better than MPI_Sendrecv, because while it removes the handshake of the send/recv it adds two additional handshakes for the fences. Without additional computations between the fences to get some overlap, this would be a terribly performing approach.

The noise in the system might be a reason for the variation. There is a simple check/solution for this: leave a core or two for the rest of the OS, and the spikes (if they are due to system noise) should disappear if they are due to system noise.

Looking at the 2-sided communication pattern, if your messages are mostly on the small size (below the so called eager size) you might inadvertently introduce buffering on the receiver side if the receiver is not ready before the data arrives. You can force the eager to 0 (which will force all communications to have a handshake, aka a cost at least twice the network latency), to check if it has any impact.

Last but not least, independent on the choice of communication paradigm (one or two sided) the code needs to make sure it uniformly distribute the communication load, and that it gives MPI an opportunity to use this information. In other words, try to balance your communications to avoid hot spots, where a particular node is origin or target of multiple concurrent communications.

Let me give you an example, that might be relevant to your application. If I understand correctly, your communication scheme is some form of neighborhood all-to-all, ranks send data to adjacent domains, so potentially to a subset of MPI ranks. Let's talk about 2 ways of implementing this:

  1. have all processes exchanging data with peers in their increasing order of ranks, so first all to rank 0, then 1, and so on. From a communication perspective the network will not be used evenly, and the performance of this implementation would be terrible in all cases (one or two sided communications);
  2. have each process exchange data with a neighbor at a fixed distance, and then increase the distance and repeat. Communications will be more evenly distributed resulting in better performance, but this scheme remain subject to noise (because a slow down of any process will propagate due to the communication pattern to all other participants).

We can argue about the benefits and drawbacks of these communication schemes, but at the end of the day none of them provides MPI with a global view of the scope communications, and each process (at best node) will try to schedule the locally known communications optimally from it's own perspective. It is possible that using a neighborhood collective might be a better approach, one that put the burden, and opportunity, of correctly scheduling communications on the MPI library.

benmenadue commented 2 years ago

@BenWibking Hmm, that's unexpected, thanks for letting us know. When we originally tested our OS we couldn't see much in the way of "OS-level noise". I'm wondering it one of the OS updates over the last few years has introduced this -- I think there were some significant updates to the kernel's task scheduler during one of the updates. While our periodic benchmarks jobs haven't shown any degradation in performance, they're not exactly communication and latency bound and so might not have shown this up.

I guess it's time to re-run those tests and see if we can identify the cause.

BenWibking commented 2 years ago

While our periodic benchmarks jobs haven't shown any degradation in performance, they're not exactly communication and latency bound and so might not have shown this up.

@benmenadue Just a suggestion for benchmarks, if you're not already running these: the Mantevo mini-apps will definitely stress test the halo exchange type of communication as used in CFD and weather codes, particularly the miniAMR and miniGhost apps. The DOE Kripke benchmark code is also exceptionally sensitive to timing jitter: https://github.com/LLNL/Kripke

frizwi commented 2 years ago

It's annoying that I cannot reproduce this behaviour in my test program, i.e the timings are always consistent there. So there must be something specific about the data structures/memory alignment in the real program that's causing the issue but I can't see what.

Basically my workflow is as follows:

foreach (dataset)
  foreach (neighbour)
    MPI_Isend(...)

foreach (dataset)
  foreach (neighbour)
    tic
    MPI_Recv(...)
    toc

The dataset is std::vector with struct (variable metadata and actual data pointer) pointers. It's the same across all processes

neighbour is std::map, the keys of which are the mpi ranks of it's neighbours, the value src/dst custom MPI_Datatypes (indexed_block).

The sizes for each dataset can vary, and neighbours can be spread across the node but once the program starts, everything should be the same for every simulation iteration. Because the processes could be spread out and not every Recv is the same size, you can easily expect the time for each Recv to be slightly different, BUT what I'm seeing are occasional variation of 1000x.

Leaving a few idle cores didn't seem to help

I have also tried:

foreach (dataset)
  foreach (neighbour)
    MPI_Sendrecv(...)

And even:

foreach (dataset)
  err = MPI_Neighbor_alltoallw()

All with similar results. I really like the last one, gets rid of the inner loop, but how can the Virtual topology influence which rank lands on which processor core given that processes have already been launched? And what exactly does reorder do in MPI_Dist_create_graph_adjacent?

One thing I thought may be an issue it that for each dataset, the data pointer is the same for both send/recv. The various MPI_Datatype definitions take care of which indices go to which neighbour and vice versa. But changing this to a contiguous buffer for recv's and then unpacking the array didn't alter the results at all, so that's not it.

Also, how do I set eager to 0?

bosilca commented 2 years ago

Many opened questions, let me try to focus my answer on a particular topic, the handling of concurrent messages. For reference I copied here the communication scheme you mentioned in your message:

foreach (dataset)
  foreach (neighbour)
    MPI_Isend(...)

foreach (dataset)
  foreach (neighbour)
    tic
    MPI_Recv(...)
    toc

The sender is unrestricted, it has a lot of nonblocking sends to realize, in some order. But before it even try to figure out how to send the data, it starts the communication by sending the handshake enveloppe (allowing the receiver to match the incoming message with a posted receive), supplemented with the eager data. On the receiver side, if the message can be matched right away (aka expected message) the eager data is pushed up into the user buffer, and the rest of the data is retrieved from the peer. However, if the receive was not yet posted (aka. unexpected message), the enveloppe together with the eager data need to be saved locally, resulting in a memory allocation in the critical path.

Looking at the way the receiver posts the receives, you can see they will be posted sequentially per neighbor, which means at any moment a single receive is expected, everything else will go unexpected and will incur the allocation and copy overhead I mentionned above. But that's not the only problem. Your tic and toc functions you are taking the CPU away from the MPI library, so the MPI library cannot extract messages from the network, and because the sender is unrestricted and will keep pushing handshakes and eager data, messages will keep accumulating on the receiver network card. The next MPI call will then need to extract most of them, or at least until it matches the blocking receive currently expected by the code, and this can be extremely costly. Thus, the communication scheme is far from optimal, and you are lucky that you see spikes only from time to time.

Assuming the buffers on the receiver side can be allocated before, you might want to pre-post all receives from a neighbor before waiting for them to complete and applying local updates. Basically, replacing the receiver code with a loop around all neighbors and posting the nonblocking receives, then doing a bunch of waitany or waitsome and apply the corresponding tic and toc on the completely received receives. I realize that I'm not making this sounds easy, but basically you need to give MPI more opportunities to extract data from the network, and minimize the opportunity for generating unexpected messages.

frizwi commented 2 years ago

@bosilca, Okay, so if I understand you correctly, there is contention going on as the sends are not immediately matched and hence there is a need for excessive buffering. Plus as this is a single threaded program, even the non-blocking sends take CPU time away from doing the receive.

Now, if I modify the code as you suggest to, say for a single variable:

tic
foreach (neighbour)
  MPI_Irecv(..)

foreach (neighbour)
  MPI_Send(..)

Waitall()
toc

I would've thought now that in this case, since all the receive's are posted first, as soon as each process gets to the send, it would immediately match and away we go. But perhaps, while the non-blocking MPI_Irecv call returns in the application, the MPI library hasn't necessarily has had time to set up buffers before the send is posted. Is this plausible?

The result is that the overal timing for the entire function varies wildly from a few milliseconds to >10 seconds. So the spikes are still there, not much better than any of the previous schemes

So then I thought of serialising the whole process such that there is only ever a single send/recv pair operation going on at any one time, and in a deterministic order. Since I know the graph up front, I can broadcast/gather all the information on all process before hand. And then my comms function looks like the following:

  void getData(void) {
    int err;

    for (auto it = mpi_data.begin(); it != mpi_data.end(); it++) {
      /* Loop over all window nodes */
      for (int n=1; n<mpi_size; n++) {
    if (n == mpi_rank) {
          for (auto ita = arr.begin(); ita != arr.end(); ita++) {
            /* key is the source window */
            int wn = ita->first;

            /* Reference to the element */
            SrcDstPair &sdpair = ita->second;

            err = MPI_Recv((*it)->ptr, 1, sdpair.dtype_dst, wn, wn,
                           MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            MPI_ERR(err);

            MPI_Barrier(MPI_COMM_WINDOWS);
          }
    } else { // not my turn
           int i, src, dst;                                                                                 

          // cache for arr.size() from the above recv loop
           for (i=0; i<src_counts[n-1]; i++) {                                                              
            src = srcs[n-1][i]; // cached arr.begin->end indices same as recv loop above                                                               
            dst = n;

            if (src == mpi_rank) {
              /* Reference to the element */
              SrcDstPair &sdpair = arr[dst];

               err = MPI_Send((*it)->ptr, 1, sdpair.dtype_src, dst, src,
                              MPI_COMM_WORLD);
          MPI_ERR(err);
            }
            MPI_Barrier(MPI_COMM_WINDOWS);
          }
    }
      }
    }
  }

The Barrier's are there to ensure that no process runs ahead. But, I still get the spikes! I know this is not optimal (eg. plenty of idol processes when there could be many simultaneous comms pairings) but all I'm trying to do right now is to get to a stage where the communications happen in such a deterministic way that the timings are stable.

Is there any debugging available in the OpenMPI library that I can invoke that will tell me who's waiting for what to get an idea about what's going on?

frizwi commented 2 years ago

Hi, I'm still trying various ways to organise this. Seems like what I need is graph "matching", i.e. pairing up the send/recv's between processes.

In the meantime, I'm trying a different approach to channel all comms via the master process - so gather/scatter. However, what I'm looking for is MPI_Gatherw/MPI_Scatterw but these are not available. What's the best way to mimick this? I have custom datatypes for each process which are not in contiguous memory so can't easily use the vector variants. I'm worried if I pack/unpack it will be costly and if I use send/recv then that's not going to scale well for 100's of processes.

Any thoughts greatly appreciated!