cplusplus / sender-receiver

Issues list for P2300
Apache License 2.0
20 stars 4 forks source link

Cooperative semantic #91

Open ericniebler opened 11 months ago

ericniebler commented 11 months ago

Issue by gevtushenko Sunday Feb 27, 2022 at 01:05 GMT Originally opened as https://github.com/NVIDIA/stdexec/issues/475


A few crucial areas that P2300 can cover require clarification on cooperative semantics. Cooperative API involves multiple threads working together towards a shared aim. For instance, let's consider the following function:

void f(int tid, auto &scheduler) {
  auto snd = schedule(scheduler) 
           | then([tid]{printf("{t%d}", tid);}) 
           | bulk(2, [tid](int i){printf("{b%d:%d}", tid, i);});

    printf("~");
    sync_wait(snd); 
}

If two threads execute the code above with an inline scheduler f(tid, inline_scheduler), we'll get some interleaving of the following characters:

~~{t0}{t1}{b0:0}{b0:1}{b1:0}{b1:1}

In other words, then is executed by each thread as well as bulk, which is expected. On the contrary, an inline cooperative scheduler f(tid, inline_coop_scheduler) would lead to the following result:

~~{t0}{b0:0}{b1:1}

Here then is specialized to execute work only once and bulk distributes work between participating threads. This approach allows representing cooperating threads as a single execution context without the overhead of task queue maintenance.

motivation

  1. distributed context:

    Let's consider the following sender adaptor:

    sender auto compute(auto &computer) {
       return schedule(computer) 
            | bulk(n_cells, process_cell)
            | then(done_once)
            | transfer(inline_scheduler{})
            | then(write);
    }

    If it adapts an inline scheduler, calling thread processes n_cells. Thread pool scheduler represents a set of threads as a single execution resource, so then would be executed once and, bulk would process n_cells in a federated manner to achieve some speedup. Extending this idea, we came to a distributed scheduler. This scheduler would partition n_cells between multiple nodes of a distributed system. Although task-based programming model is a known approach for distributed programming models, static information can improve performance by reducing tasks distribution. This leads us to a cooperative distributed scheduler:

    int main() {
       // Access runtime to query process id and number of processes
       coop_distributed_scheduler scheduler{}; 
       sync_wait(scheduler);
    }
    
    // mpirun -np 2 ./compute

    Note that we can achieve the effect of performing then by each cooperating executor by transfer-ing to an inline_scheduler.

  2. locality:

    Assigning a thread to a particular execution resource might reduce the number of context switches, which affects performance. For the code above, we might use a multi-GPU scheduler:

    int main() {
       // Switches between GPUs internally
       multi_gpu_scheduler scheduler{}; 
       sync_wait(scheduler);
    }

    Performance might be improved if we assign a thread to a particular GPU:

    int main() {
       #pragma omp parallel 
       {
         // No GPU context switches
         coop_multi_gpu_scheduler scheduler{}; 
         sync_wait(scheduler);
       }
    }
  3. nesting:

    The following code represents a case of executing a sender in cooperative and inline contexts. It's expected to get the same result in cases (1) and (2) without the overhead of dealing with a task queue.

    assert(get_forward_progress_guarantee(scheduler) == concurrent);
    
    sync_wait(
     schedule(scheduler)
    | bulk(2, [](int thread_num) {
       inline_cooperative_scheduler sub_scheduler{thread_num, 2};
    
       // per-thread prologue
       sync_wait(schedule(sub_scheduler) | compute());    // 1
       // per-thread epilogue
     })
    );
    
    sync_wait(schedule(scheduler) | compute());            // 2

Having inline behavior in those contexts would change the sender's behavior. Providing cooperative versions of then and bulk would limit code reuse since a sender author would have to know if they are developing code for a cooperative context.

goals

ericniebler commented 11 months ago

Comment by kirkshoop Sunday Feb 27, 2022 at 20:12 GMT


I have re-read this a few times and I think that I am getting closer to understanding the scenarios here. Still not completely sure though.

nesting sync_wait in an expression breaks the expected behaviour of a sender expression. nested execution is expected to be composed into the expression such that no execution agent is blocked waiting for another.

Are there ways to express this cooperation idea that do not use inline schedulers? inline is already hard to reason about and what an inline-cooperative-scheduler does is still a mystery to me.

Is there code that demonstrates an inline-cooperative-scheduler?

Here then is specialized to execute work only once and bulk distributes work between participating threads.

Is f() still called twice on two different threads in this case?

ericniebler commented 11 months ago

Comment by LeeHowes Monday Feb 28, 2022 at 18:54 GMT


Ignoring the fact that any example using an inline scheduler makes me wary. Let's assume instead that this is a cooperative thread pool scheduler, which can include the current thread.

The semantic of executing then and bulk once is scary. That is not at all obvious.

I could imagine though that a thread pool scheduler would distribute, including to the original threads. The problem is that sync_wait introduces a new scheduler - it would be a fairly big step to make a general formulation where sync_wait somehow allows the caller to give itself up in a way that allows sync_wait to distribute to other threads in the thread pool that the caller blocked from. Possible, using thread_locals but not general.

I don't really see why you would want to do this, though. We shouldn't encourage blocking now we have coroutines. If instead your code is written:

task<void> f() {
  auto tid = co_thread_id();
  auto snd = get_scheduler() | then_value([](auto sch) {
      return schedule(sch) 
           | then([tid]{printf("{t%d}", tid);}) 
           | bulk(2, [tid](int i){printf("{b%d:%d}", tid, i);});
    });
    printf("~");
    co_await(snd); 
}

...
static_thread_pool thread_pool{2};
sync_wait(on(thread_pool, when_all(f(), f()));

where get_scheduler() does what it does in P2300 and passes the scheduler from the receiver down the value channel, then this works cleanly. We could simplify this with schedule() heading a chain directly from the scheduler it gets from the receiver, or by having just bounce back the completion scheduler from the receiver, I forget if we defined it that way. It's cleaner in any case because we're not passing schedulers around.

Each instance of f yields, so isn't consuming a thread, and all of this will run on whatever scheduler f was running on. That should be cleanly cooperative across the two threads of the thread pool.

Given that, is there anything actually missing here or is it a problem created by trying to block in async code and using inline schedulers?

All of that said, I do think we should be using structured primitives more, that naturally fit patterns like this. I'd rather not see schedule and transfer in very many examples at all, because they tend to imply poorly structured concurrency.

ericniebler commented 11 months ago

Comment by jrhemstad Thursday Mar 03, 2022 at 17:32 GMT


You'll have to forgive my ignorance as I'm still a P2300 novice but I do have quite a bit of experience in what we are calling "cooperative" style code (and not just CUDA). I've written the same code in MPI:

int main(){
   auto my_rank = MPI_Comm_rank(MPI_COMM_WORLD);
   auto num_ranks = MPI_Comm_size(MPI_COMM_WORLD):
   entire_program(my_rank, num_ranks);
}

or OpenMP:

int main(){
#pragma omp parallel{
   auto thread_id = omp_get_thread_num();
   auto num_threads = omp_get_num_threads();
   entire_program(thread_id, num_threads);
}
}

and finally CUDA:

__global__ void kernel(...){
   auto thread_id = threadIdx.x + blockIdx.x * blockDim.x;
   auto num_threads = blockDim.x * gridDim.x;
   entire_program(thread_id, num_threads);
}

The broader community would refer to this as "SPMD" parallelism, or "Single Program, Multiple Data" where multiple execution agents execute the exact same code on different inputs. This is how I first learned parallel computing and it is still extremely common in not just CUDA, but MPI and OpenMP.

This is a very common way to express parallelism in performance-sensitive circles because it still usually gives the best performance. Fine-grained, dynamic task decomposition with complex task graphs and task queues can be more elegant and expressive, but the fact is that for many problems a static, coarse-grain decomposition is still faster as it improves data locality and reduces scheduling and communication overheads. (A few papers for reference: 0, 1, 2, 3. Forgive me for not having more recent papers, but I've been out of the academia game for a while :smile: ). A quote from 0 that summarizes things nicely:

Our experiments, on three different multicore architectures using the threading functionality provided by OpenMP, show that even though multicore architectures allow for fine grain task decomposition and frequent synchronization, the best performance is achieved when the task decomposition is coarse and synchronization is infrequent. In addition to this we found that data locality, which when using OpenMP can only be controlled implicitly by having threads operate on the data they generate, is also crucial to achieving performance on a large number of cores.

I think it would dramatically limit the applicability and adoption of the S&R model if it doesn't allow for SPMD parallelism.

I haven't quite wrapped my head around what the concerns are with the semantics of a cooperative scheduler. I imagine a very simple example like:

int main(){
    auto sch = coop_mpi_scheduler{}; // represents running on MPI_Comm_size(MPI_COMM_WORLD) processes
    auto print42 = schedule(sch) | then( []() { return 42; } ) | bulk(num_ranks, [](int i){ std::cout << i << std::endl; };
    sync_wait(print42);
}

Doing mpirun -n 4 would print 42 four times.

The equivalent MPI code would look something like:

int main()
   int i = (rank == 0) ? then_fn() : 0; // Only rank0 executes `then_fn`
   MPI_Broadcast(&i);                   // Communicates the result of `then_fn` to all ranks
   bulk_fn(i);                          // All ranks execute bulk_fn
   MPI_Barrier(...);                    // Synchronize all ranks
}

Could someone help me understand what the concerns are with this code or is it too simple to capture the potential problems?

ericniebler commented 11 months ago

Comment by jrhemstad Thursday Mar 03, 2022 at 19:24 GMT


Re-reading P2300, I am reminded that the inclusive scan example is much less contrived than my example above. It is already written in a very nearly SPMD-style.

I would want to be able to map that algorithm to something like this in MPI:

auto const rank = MPI_Comm_rank(MPI_COMM_WORLD);
auto const num_ranks = MPI_Comm_size(MPI_COMM_WORLD);
auto const tile_size = ceil(input.size()/num_ranks);
auto my_input = input.subspan(rank * tile_size, tile_size);
auto my_output = output.subspan(rank * tile_size, tile_size);

auto my_partial = *std::prev(std::inclusive_scan(begin(my_input), end(my_input), begin(my_output));

auto partials = (rank == 0) ? std::vector<double>(num_ranks+1) : {};

MPI_Gather(/* my_partial, partials.data() */); // All ranks send their partial to rank 0

if( rank == 0){
   std::inclusive(begin(partials), end(partials), begin(partials));
   for(auto const& p : partials)
      MPI_ISend( /*p, i*/); // Rank 0 sends `partial[i]` to rank `i`
} else {
   MPI_Recv(/* my_partial */); // Receive partial from rank 0 and overwrite my_partial
}

std::transform(begin(my_output), end(my_output), begin(views::repeat(my_partial)), begin(my_output), std::plus{});

It is important for the performance of this algorithm that the agent that computes the initial inclusive_scan on chunk i is the same agent that performs the elementwise summation of my_partial to exploit both NUMA and cache data locality. The easiest way to do this is with a simple, static 1D decomposition where a single agent is responsible for all of the work on a particular tile.

Now, you could do this instead with a dynamic task scheduler with a centralized task queue containing the independent tasks for computing a tiles inclusive_scan and then elementwise sum. Creating that queue and scheduling the tasks onto agents introduces overhead that does not exist in the above solution. Furthermore, if your task scheduling isn't locality aware, it's possible two different execution agents would execute the inclusive_scan and transform tasks and spoil the locality benefits of doing them on the same agent.

ericniebler commented 11 months ago

Comment by LeeHowes Thursday Mar 03, 2022 at 20:08 GMT


Having:

int main(){
    auto sch = coop_mpi_scheduler{}; // represents running on MPI_Comm_size(MPI_COMM_WORLD) processes
    auto print42 = schedule(sch) | then( []() { return 42; } ) | bulk(num_ranks, [](int i){ std::cout << i << std::endl; };
    sync_wait(print42);
}

print 42 4 times is what you expect. The question is how many times schedule and then actually run. If we hide the SPMD part of this somewhere so that we run the cout distributed, but the return 42 is on some master task, then it is easy to visualise and makes perfect sense. I think we could define it all that way if we are running just the SR parts of the program in an MPI-conscious way, and the scheduler runs scalar things once somewhere and bulk things distributed.

The problem as described though is that schedule and then actually run multiple times as well, because all of main is running in SPMD fashion. They aren't magically SPMD-aware. Additionally, if those are implemented in a default fashion it isn't at all obvious how to filter the scalar tasks out: when set_value is called, when schedule's sender completes, it will complete on every SPMD agent not just on the first. If we customise all of this then a scheduler implementation can do what it likes and the SPMDness is invisible to the model.

We can also make it work if we propagate the agent and filter in the code:

int main(){
    auto sch = coop_mpi_scheduler{}; // represents running on MPI_Comm_size(MPI_COMM_WORLD) processes
    auto print42 = schedule(sch) | then( [](auto agent) { if(agent == 0) {return bigWork();} else {return {};} ) | bulk(num_ranks, [](int i){ std::cout << i << std::endl; };
    sync_wait(print42);
}

and assume that bulk customises such that it bridges all the agents. That's messy, though.

The other way to approach this is a pure work queue model:

int main(){
    auto sch = coop_mpi_scheduler{}; // represents running on MPI_Comm_size(MPI_COMM_WORLD) processes
    if(sch.agent() == 0) {
      auto print42 = schedule(sch) | then( [](auto agent) { if(agent == 0) {return bigWork();} else {return {};} ) | 
  bulk(num_ranks, [](int i){ std::cout << i << std::endl; };
    }
    sch.donate();
}

Where all threads donate themselves to the cooperative scheduler, but only one agent actually runs the program that builds the graph. That will not incorporate so well into the rest of an MPI program, and would presumably be less efficient, but has more obvious execution semantics for unspecialised code.

ericniebler commented 11 months ago

Comment by kirkshoop Thursday Mar 03, 2022 at 20:30 GMT


Perhaps the mapping of existing cooperative code would be easier with a mental model of sender receiver that Lewis has championed.

Consider a sender to be a function.

Connect on a sender creates a frame(operation state).

Start on an operation state enters the function.

Set_value, set_error and set_stopped on the receiver exit the function with a result, an 'exception' or an unwind.

In this model, instead of trying to use sender receiver inside entire_program, try converting entireprogram into a sender that stores local vars in the operation state, begins work in start, and calls set.. to exit.

For each nested operation in entire_program, attempt to convert that function into another sender then compose the senders together. Once we have a few examples of these conversions, we can use them to create a new general algorithm that can be used to express these conversions with less boilerplate.

ericniebler commented 11 months ago

Comment by jrhemstad Monday Mar 07, 2022 at 15:25 GMT


The problem as described though is that schedule and then actually run multiple times as well, because all of main is running in SPMD fashion. They aren't magically SPMD-aware. If we customise all of this then a scheduler implementation can do what it likes and the SPMDness is invisible to the model.

Right, the whole chain of SRs would be executed on every agent. So the question is, what do operations like then do when executed N times?

My understanding is that coop_mpi_scheduler would customize then to only execute the callable on a single agent and communicate the result to all the other agents (the MPI_Broadcast in my example).

So is the primary concern here that a cooperative scheduler would conceivably have to customize every S&R algorithm?

ericniebler commented 11 months ago

Comment by LeeHowes Monday Mar 07, 2022 at 17:29 GMT


So is the primary concern here that a cooperative scheduler would conceivably have to customize every S&R algorithm?

I think so, yes. This is the same concern that led us to the quirky completion_scheduler mechanism and customising on that instead of the algorithm.

ericniebler commented 11 months ago

Comment by vossmjp Monday Mar 07, 2022 at 17:39 GMT


It seems like some of the previous suggestions are about how to write an SPMD program using senders-receivers, but I don’t think that’s the issue. The problem we are trying to solve here is how to write code that doesn’t include thread-id/rank checks, yet supports both cooperative and non-cooperative schedulers, and is easy to reason about in isolation. The main challenge is that the SR chain is built in an implicitly nested context in an SPMD program – N threads/jobs execute the functions -- while in a non-cooperative context only a single thread builds the chain. My experience is that SPMD code looks like it is SPMD; whether it’s an MPI program or an OpenMP SPMD-style program, there are checks for thread-id or rank etc. throughout the code. In P2300, thenis specified as returning a sender than invokes f; otherwise, it is ill-formed. Are we considering allowing customizations of thenthat sometimes return senders that do not invoke f?

ericniebler commented 11 months ago

Comment by ericniebler Monday Apr 04, 2022 at 21:06 GMT


I'm late to this party. @kirkshoop wrote:

For each nested operation in entire_program, attempt to convert that function into another sender then compose the senders together. Once we have a few examples of these conversions, we can use them to create a new general algorithm that can be used to express these conversions with less boilerplate.

I don't understand this part. It sounds like you want to transform the sender expression tree into ... what exactly?

@vossmjp wrote:

Are we considering allowing customizations of then that sometimes return senders that do not invoke f?

That's just it, I don't think we can, and I don't think we should. The algorithms need to have well-defined semantics, even for customizations. This seems a bit outside the purview of vanilla S&R as we have been imagining it. It needs some serious thought. 🤔

ericniebler commented 11 months ago

Comment by kirkshoop Tuesday Apr 05, 2022 at 19:18 GMT


@ericniebler I was trying to describe how to approach the reverse. In order to understand how to represent the existing cooperative code as senders, use the existing functions in code that works cooperatively today (the 'main' function and each function that are called by a function) as the structure for the sender representation of the same cooperative work. This should prevent the issues around algorithms that are only run once - etc..