inducer / pytato

Lazily evaluated arrays in Python
Other
12 stars 16 forks source link

Communication nodes in pytato #55

Open matthiasdiener opened 3 years ago

matthiasdiener commented 3 years ago

From the discussion with @inducer (see also #54):

inducer commented 3 years ago

If we make separate send and receive nodes (which feels like the right approach to me), a complication that arises is that a "send node" is a sink, i.e. it doesn't have a "result", and nobody (at least on the local node) depends on its completion. That means, it has nowhere obvious to go within the DAG. We could just randomly shove send nodes into the Namespace and be done with it, but that feels inelegant. Another option would be to attach (as "fake dependencies", in a way) to receive nodes (since, at least in DG, much of our communication is paired sends and receives, though I can think of counterexamples... think 1D advection, which only needs to communicate in one direction). I'm not enthusiastic about either of the two options.

@kaushikcfd @rwerger @vincentwells, I want to make sure that you're part of this discussion early on, to make sure we pick a direction that you're OK with.

kaushikcfd commented 3 years ago
  1. Receive nodes fit naturally within pytato as they could be thought of as data wrappers but initialized via callbacks instead of concrete data.
  2. Send nodes in a namespace are weird: as "sends" are a characteristic of a computation and not the namespace (i.e. how would we realize which expression to tie the send to if multiple expressions are dangling from a namespace). I propose realizing "send" nodes via tags:
    
    rank = comm.get_rank()
    size = comm.get_size()
    x = pt.make_placeholder(shape=(10,), dtype=float)
    left_bnd = pt.tag(x[0], pt.MPISendTag(to=(rank-1)//size, tag="left")
    rght_bnd = pt.tag(x[9], pt.MPISendTag(to=(rank+1)//size, tag="right")
    left_halo = pt.mpi_recv(from=(rank-1)//size, tag="right", shape=(), dtype=float)
    rght_halo = pt.mpi_recv(from=(rank+1)//size, tag="left", shape=(), dtype=float)
    y = advect(x, left_halo, rght_halo)
    pt.generate_loopy({"left_bnd": left_bnd, "rght_bnd": rght_bnd, "y": y})  # send nodes are "outputs"

The tags identifiers are a hint to the code-generator about the dependencies between the send and receive nodes.
kaushikcfd commented 3 years ago

With the same "send via tags" interface an upwind scheme advection could be realized as:

rank = comm.get_rank()
size = comm.get_size()
x = pt.make_placeholder(shape=(10,), dtype=float)
bnd = pt.tag(x[0], pt.MPISendTag(to=(rank-1)//size, tag="halo"))
halo = pt.mpi_recv(from=(rank+1)//size, tag="halo", shape=(), dtype=float)
y = advect(x, halo)
pt.generate_loopy({"bnd": bnd, "y": y})
inducer commented 3 years ago

Thanks for the suggestion! I agree that the namespace thing is out, especially given that we were leaning towards getting rid of the namespace anyway.

As a result, I agree that the sends need to find their way to code generation in some way.

As a general comment, I would name the nodes Send (or maybe DistributedSend? RankSend?) rather than MPISend, given that the actual implementation may not be MPI.

As for tagging, I can see why it's attractive. It solves the problem that the result of a send is "nothing" (definitely not an array, at least). However, I wonder whether I like the idea of burying semantically relevant information in metadata. In addition, we also have other ways of modeling "nothing". An empty DictOfNamedArrays maybe?

I do wonder whether it will be awkward in terms of code structure to pass "send results" all the way through to code generation (through otherwise purely arithmetic code). I wonder if it would be legitimate to allow "sticking" the send result onto "just about any" node in the graph, via a tag, and have a preprocessing pass collect those send results.

kaushikcfd commented 3 years ago

The way I'm thinking about this is: send is not really a semantically relevant information as far as the current rank is considered.

I do wonder whether it will be awkward in terms of code structure to pass "send results" all the way through to code generation (through otherwise purely arithmetic code).

I'm sorry but I don't see how else would we tell pytato that a sub-graph is to be realized as "send". If you have an example in mind, could you please add that in.

kaushikcfd commented 3 years ago

On discussion with @matthiasdiener, another question that needs some discussion is whether we should do this entirely in Pytato or should we require the target (for ex. loopy) to provide the point-to-point communication primitives. I think we can entirely do this in pytato with a BoundProgram containing multiple loopy programs and BoundProgram.__call__ orchestrating the comms. Anyone notice anything wrong with the idea?

inducer commented 3 years ago

The way I'm thinking about this is: send is not really a semantically relevant information as far as the current rank is considered.

I think I understand what you're saying, but I disagree. The send info is semantically relevant in that it aids in describing a larger graph (that, ifyou want to think of it this way, stretches across nodes). Metadata on the other hand are kind of more advisory in nature to me.

I'm sorry but I don't see how else would we tell pytato that a sub-graph is to be realized as "send". If you have an example in mind, could you please add that in.

Sure! We could simply make a DistributedSend node that sends its child. The example also gives a sense of the AdditionalOutput tag.

rank = comm.get_rank()
size = comm.get_size()
x = pt.make_placeholder(shape=(10,), dtype=float)
bnd = pt.DistributedSend(x[0], to=(rank-1)%size, tag="halo")
halo = pt.DistributedRecv(from=(rank+1)%size, tag="halo", shape=(), dtype=float, tags=pt.AdditionalOutput(bnd, prefix="send"))
y = advect(x, halo)
pt.generate_loopy({"y": y})

whether we should do this entirely in Pytato or should we require the target (for ex. loopy) to provide the point-to-point communication primitives.

I agree that keeping communication in Pytato for now is a good option. Otherwise, we need to add communication primitives in two places (loopy and here) instead of one. In addition, if we decide to, we can always change our mind on this and push them further down if needed.

kaushikcfd commented 3 years ago

The send info is semantically relevant in that it aids in describing a larger graph (that, if you want to think of it this way, stretches across nodes)

But I thought pytato's DAGs were rank-local (there is no rank that can access the "entire" DAG as per the library's interface and each rank constructs its own DAG) although the computation itself might extend beyond it.

I like the snippet you shared, except one nit about the AdditionalOutput tag. IIUC AdditionalOutput is used to indicate the correct dependencies in the generated code. Could those be inferred via DistributedSend/Recv.tag (i.e. "halo") itself, and what's the use of AdditionalOutput.prefix?

inducer commented 3 years ago

But I thought pytato's DAGs were rank-local (there is no rank that can access the "entire" DAG as per the library's interface and each rank constructs its own DAG) although the computation itself might extend beyond it.

This is true! However, "semantically"/"psychologically" all ranks together construct a joint DAG, even if that DAG never exists in one place/on one rank as a data structure. Considering that (imagined) DAG, the sends and receives are semantically relevant. (i.e. not just advisory)

except one nit about the AdditionalOutput tag. IIUC AdditionalOutput is used to indicate the correct dependencies in the generated code.

The AdditionalOutput explicitly doesn't convey any dependencies. After all, the send doesn't have any (local) dependencies. The main purpose of the AdditionalOutput tag is to inform the code generator that it should include an additional output in its output dict. Since we don't have good control over name uniqueness in that output dict (and really don't need to for this), it supplies a prefix rather than a final name.

I'm aware that I'm trying to explain some subtle points. If I'm not being clear, feel free to ask!

inducer commented 3 years ago

The design in https://github.com/inducer/pytato/issues/55#issuecomment-808592686 would be banned by #59.

inducer commented 3 years ago

During group meeting just now, @kaushikcfd voiced an interesting idea for where to put the "send" nodes: Don't! I.e. assume that all communication happens "paired", i.e. sends and receives don't occur in isolation. In that setting, unidirectional coummuication could still be modeled as an exchange pair where one of the directions has a zero size. (@kaushikcfd, you also mentioned that you had previously proposed this and felt like it got rejected... upon rereading the issue, I'm not sure I saw where that would have been, but I'd be grateful to be pointed in the right direction, to make sure there's not other stuff I'm overlooking.)

I initially rather liked this proposal because it creates an "obvious" place for the send to live when communication is paired. But in the "unpaired" case, it's really mostly equivalent to having the send "return" a zero scalar. (Returning empty arrays seems appealing until you try numpys broadcasting around them. IMO, there's no reason for it to error, but it sure does error.)

@matthiasdiener @kaushikcfd What do you think?

kaushikcfd commented 3 years ago

you also mentioned that you had previously proposed this and felt like it got rejected... upon rereading the issue, I'm not sure I saw where that would have been, but I'd be grateful to be pointed in the right direction, to make sure there's not other stuff I'm overlooking

Yep, looks like I remembered it incorrectly. (Sorry!)

it's really mostly equivalent to having the send "return" a zero scalar

Aah yep, hadn't though deeply here.

How about we make it the user's problem to make sure that the send nodes are part of the DAG. I.e. if "Send" node's not part of the DAG, tough luck we don't send it.

I think we can generate the correct code for the upwind advection problem as sketched below and the user-interface isn't too awful.

rank = comm.get_rank()
nranks = comm.get_size()

u_prev_timestep = pt.make_data_wrapper(init_cond_np_array)

if rank < (nranks - 1):
    # indices_to_send is any valid index expression (basic/advanced)
    u_prev_timestep = pt.mpi.send_to_rank(u_prev_timestep,
                                          indices_to_send=(-1,),
                                          to=rank+1,
                                          mpi_tag="upwind_halo")
if rank != 0:
    halo = pt.mpi.recv_from_rank(shape=(1,), dtype=np.float64, from_=rank-1, mpi_tag="upwind_halo")
else:
    halo = get_boundary_condition(...)

u_new = u_prev_timestep + dt * diff(u_prev_timestep, halo)
inducer commented 3 years ago

How about we make it the user's problem to make sure that the send nodes are part of the DAG. I.e. if "Send" node's not part of the DAG, tough luck we don't send it.

IMO, I think that's kind of a nonstarter. In grudge, sends happen deep inside the flux exchange. If we required the sends to be returned separately, the interface of that would have to change, the return value would no longer support arithmetic, and it would be the same ugly song and dance all the way up the stack (~5 layers).

kaushikcfd commented 3 years ago

In grudge, sends happen deep inside the flux exchange

I think that should be fine, as the following code doesn't seem too intrusive to me:

def flux(tpair):
    u_int = pt.mpi.send_to_rank(u, indices=..., rank=...,)
    u_ext = u.ext
    central_flux = 0.5 * (u_int + u_ext)
    return discr.project(central_flux, .., ...)
inducer commented 3 years ago

I disagree. As a simple exmaple, consider the wave operator.

In mirgecom, these calls are even more layers deep. You're telling me you want to change these interfaces all to return tuples, so that the interface either drags around a pointless empty entry for sequential execution, or is inconsistent between distributed and sequential?

kaushikcfd commented 3 years ago

Practically, if we just make the "local_dof_array" in https://github.com/inducer/grudge/blob/2af3528391bf4cb07f5aacf43f7c6afcc18f693a/grudge/trace_pair.py#L317 to be "sent" array, it should solve all our problems. As this trace pair's interior component will be a part of the expression graph for almost all our use cases?

inducer commented 3 years ago

it should solve all our problems.

How does that solve returning tons of send nodes through a zillion layers of API?

kaushikcfd commented 3 years ago

How does that solve returning tons of send nodes through a zillion layers of API?

Because the sent array will be a part of the expression graph now as it will be returned as the interior component of the TracePair after finish has been called.

kaushikcfd commented 3 years ago

After discussing with @inducer it was pointed out that the interface proposed in https://github.com/inducer/pytato/issues/55#issuecomment-944789060 isn't ideal as forcing the receiving rank to construct the derived expressions only from the sub-expression useful to the sender rank is too restrictive and has potential to increase the communication volume for certain applications. So, let's remove it from the set of contenders.