flux-framework / flux-core

core services for the Flux resource management framework
GNU Lesser General Public License v3.0
168 stars 50 forks source link

synchronous RPCs block the flux reactor #126

Closed garlick closed 9 years ago

garlick commented 9 years ago

flux_json_rpc(), and calls based on it, like say kvs_get() to name one of many, send a request then block until a response is received.

In reactor based programs, this means other events cannot be handled during the RPC and that thread effectively stalls while the RPC is pending.

In once sense this is a feature, e.g. a call like kvs_get() or kvs_commit() is effectively atomic from the perspective of the caller. However it also creates several practical problems:

Should there be a scheme for telling the handle/reactor what messages are higher priority than an RPC response and when they arrive, have the RPC return with EINTR?

grondo commented 9 years ago

Ideally there would be asynchronous variants of the various calls for pure event-driven code. Otherwise we will run into bad situations when callers inadvertently block in an otherwise non-blocking application.

Another crazy idea would be to allow kvs_get() and other "blocking" calls to optionally use the current handle's reactor loop to "wait" for their response, instead of a blocking recv. This would allow other events to be processed as they come in, and would require a lot less "special case" code to handle EINTR or other one-off errors from blocking code used in an async application.

In my naive view, this makes these blocking calls more like coroutines that, in some ways, yield back to the async application while they essentially block waiting for a response. I think that model might work ok, especially if you have to "opt in" to the behavior so that the application author is aware. (I would propose that the opt-in be a property of the flux handle).

Either way, this feels like tricky ground and requires some thought.

garlick commented 9 years ago

To recap our face to face discussion:

That would be neat. However, I don't think we can do that in C without evil hackery like this or clumsy "function restart" mechanisms like the KVS's waitqueue abstraction

It looks like we could neatly implement in lua.

Did I capture what we talked about?

After writing this, I find it hard to let this idea go as "not possible in C". It would be tremendously nice to have it or an approximation.

grondo commented 9 years ago

Yes, and sorry for being so dense.

To clarify, rather than "real" coroutine support I was suggesting that if we had support for nested reactor loops we could do something like coroutines (kind of a yield back to the async application, i.e. events, messages, and signals of interest could be handled while you are "blocked" waiting for the synchronous operation to complete)

For example, libev supports nested loops because presumably on each entry to a libev loop you rebuild the pollfds or equivalent, schedule the next timer, etc, all from scratch. There is a function ev_loop_depth to check the current nesting depth, and the ev_break function supports EV_BREAK_ONE to break out of the current loop, or EV_BREAK_ALL to break out of all nested loops.

How this might work in a Flux application is that a (possibly nested) call to flux_reactor_start() would replace the blocking recv inside of flux_rpc calls. Thus, while waiting for the message of interest inside of flux_rpc, all other application callbacks are handled normally, including response to shutdown events, signals, etc. Once the reply to the rpc arrives, the handler for that specific message (presumably using matchtag) would return from the inner loop with flux_reactor_stop (FLUX_STOP_ONE) or similar, and return normally from the flux_rpc call.

This is similar to the libev suggestion for handling modal dialogues in UI apps.

As we discussed, the problem here is that you could arbitrarily recursively call flux_reactor_start() pretty easily, so that would have to by managed somehow with a maximum depth parameter.

I was also advocating that perhaps this behavior could be triggered only when a certain flag was set in the flux handle.

However, this is far from a perfect solution and there are probably other problems with it I'm not seeing, etc. (But I wanted to continue to document our thoughts here.)

garlick commented 9 years ago

It just occurred to me that if we could get this working, the kvs server-end code could be dramatically simplified. Over there in order to do a kvs.get. we have to walk the namespace recursively, mapping each path component to an object store entry, calling kvs.load for any objects not in cache. To avoid stalling we have a wait queue associated with each "in progress" cache value, so that when a response is received and a cache entry is instantiated, all functions that were waiting on it are restarted. For a given kvs.get, the internal handler may be restarted serveral times, once for each cache miss. This code is a little hard to follow and inefficient.

In the new model, a kvs.get handler could just call kvs.load (to retrieve hashed objects) in a loop. That would be a dramatic simplification.

garlick commented 9 years ago

Looking at the zloop code, I'm not sure there is any reason why a reactor callback couldn't call zloop_start() via flux_reactor_start(). We should try your suggested modification to flux_rpc() and if that works, explore how to limit recursion.

garlick commented 9 years ago

Checkpoint for the weekend:

zloop won't recurse after all. I'm playing with some patches here:

https://github.com/garlick/czmq/tree/zloop-recurse

but I wonder if migrating to libev might be a better move in the long run.

In a flux-core tree I haven't checked in yet, I have a modified flux_t handle that includes a new handle flag FLUX_FLAGS_RPC_REACT which modifies the behavior of flux_receive_response() to set up a callback and enter the reactor instead of directly blocking on a receive. So far my tests have some problems. Not sure if they are mine or czmq yet (probably mine).

garlick commented 9 years ago

Experiments integrating libev and setting up the reactor to recurse were parked here for now:

https://github.com/garlick/flux-core/tree/reactor-recurse

Having integrated zeromq following suggestions in this blog

http://funcptr.net/2013/04/20/embedding-zeromq-in-the-libev-event-loop/

I later found in the libev documentation that the prepare/check watchers cannot reenter the event loop recursively, which was kind of the whole point. Unfortunately, I can't see how to integrate zeromq without using those hooks.

I never figured out a strategy to limit the depth of recursion in this code.

Although these roadblocks were not fundamental, practically speaking I wasn't getting anywhere with this and needed to back away from it.

trws commented 9 years ago

Given the most recent comment, I'm not sure this will be something you want to look into, but something occurred to me as an option last night. While the posix standard deprecated the context switch routines in ucontext_t, making portable co-routines an issue, there is still an actively supported Boost project for both thread context, register and stack, management and a coroutine library built on top of it. The libraries are surprisingly portable, and not bad to use. Would it make this either easier or more practical if we pull together a C-compatible shim library to give you real yielding coroutines? I took a few minutes and made a short test code on my laptop that uses a c-compatible API shim on top of symmettric_coroutine<>::call_type, and it seems to work just fine. See below for the, quick and dirty, code.

#include <boost/coroutine/all.hpp>
#include <iostream>

typedef void c_coro;

typedef void(*coro_fun)(c_coro *);

class coro {
  private:
    struct BoolConversion { int dummy; };
  public:
    inline operator int BoolConversion::* ()
    {
      return co ? & BoolConversion::dummy : 0;
    }
  public:
    coro(coro_fun fun) :
      target(fun),
      co([&, this](boost::coroutines::symmetric_coroutine<void*>::yield_type& yield){
          yielder = &yield;
          target(reinterpret_cast<c_coro*>(this));
          })
    {}

    bool  operator!() const{
      return (!co);
    }

    void operator()(void* dat){
      co(dat);
    }
    void yield(){
      (*yielder)();
    }
    void* get(){
      return yielder->get();
    }

  private:
    coro_fun target;
    boost::coroutines::symmetric_coroutine<void*>::call_type co;
    boost::coroutines::symmetric_coroutine<void*>::yield_type* yielder;
};

bool call(c_coro* co, void* dat){
  coro* cpp_co = reinterpret_cast<coro*>(co);
  (*cpp_co)(dat);
  return (bool)(*cpp_co);
}

void* get(c_coro* co){
  return reinterpret_cast<coro*>(co)->get();
}
void yield(c_coro* co){
  reinterpret_cast<coro*>(co)->yield();
}

c_coro * new_coro(coro_fun fun){
  return new coro(fun);
}

void free_coro(c_coro * co){
  delete reinterpret_cast<coro*>(co);
}

void test(c_coro* current_coro){

  std::cout << "running test" << std::endl;
  std::cout << "yielding" << std::endl;
  for(int i=0; i < 5; i++){
    yield(current_coro);
    std::cout << "back in test" << std::endl;
  }
}

int main(int argc, char *argv[])
{
  c_coro * test_coroutine = new_coro(&test);

  while(call(test_coroutine, NULL)) {
    std::cout << "back in main" << std::endl;
  }
  free_coro(test_coroutine);
  return 0;
}

result:

running test
yielding
back in main
back in test
back in main
back in test
back in main
back in test
back in main
back in test
back in main
back in test
garlick commented 9 years ago

Very cool!

I need to get my head around how we could use that capability in the current code base so I have something concrete to think about.

Say we retain the reactor model, but have the event loop be a sort of "parent" coroutine and each handler be started as a "child" coroutine? When a handler completes, it simply returns to the reactor as it does now. But if before then it needs to make an RPC, the RPC code could send the request, internally register a reactor callback for the matching response, and yield to the reactor. Then when the response is received the internal callback would match the response to the sender and resume the handler.

Is that what you would call asymmetric coroutines since handlers only yield to the reactor loop?

Does the above even make sense or am I missing some more clever way to use coroutines here (probably!)

trws commented 9 years ago

What you said definitely makes sense, depending on how you want to write the children, RPCs and signals it could work like that or any number of different ways.

As a mechanism, the symmetric vs asymmetric distinction seems to be that asymmetric callbacks always yield directly to the caller, and symmetric ones can yield to any coroutine, parent or not. For example, you could have three symmetric co-routines (a,b,c) that yield a->b->c->a... Or you can store off the co-routine and come back to it. The asymmetric version seems to be designed with the goal of producing generators or other infinite consuming iterators easily in mind, where the symmetric kind is more what I think of as a co-routine in other contexts.

It would probably end up being that the outer event loop would spawn each child as a coroutine, as you note. The child could then do one of two things, it could return successfully or call a blocking RPC. A successful return causes the boolean conversion of the coroutine type to return false, hence the while(call_coro(...)) stuff in the test code. That way the reactor knows the child is done and can clear it. If it calls a blocking RPC, it can do as you say and start the request then yield. The yield would put it back in the reactor loop and, since the result of the call is true, add it to a waiting list or queue to be continued later. If the blocking RPC registers a callback, then the waiting list isn't actually necessary, but it might be useful in some cases to write something like an asynchronous polling loop that gets selected to make progress whenever it circles back around to the front of the queue.

Overall it's just a question of what model makes writing children for the reactor most straightforward for this project. Should the children be returning state on each yield, or receiving state on each call, or both? Is there a reason for a child to resume another co-routine under some circumstance? Should this save the floating point registers, or leave them off? Should the signal mask be saved (probably not, but still)? What arguments should be passed into the coroutine? The last of these is really a C interface issue, but worth considering.

garlick commented 9 years ago

Well if we want to keep it simple to start, and try to confine changes to the RPC and reactor code, then I would say it makes sense to just run handlers as coroutines, have them only yield to the reactor, and have them only resume via a callback arrangement. Signal masks could be ignored. Floating point regs should probably be preserved? The args in/args out for a coroutine should map some way onto the prototypes for reactor callbacks.

I'm worried about the need to link against boost. Assuming the approach plays out as a good one, do you know if it is feasible to extract the coroutine/context bits and just pull them into the project?

dongahn commented 9 years ago

Tom can say for sure. But I can share my experience with this approach FWIW.

As I know, there are some utilities (bcp?) out there, which can extract the subset of boost you need. And in fact I used this approach for launchmon.

First, what I found was, though, the extracted subset was pretty large even though what I needed was only a small piece. (probably due to some complex dependencies). But this wasn't the worse part. When I asked around about this, I heard a large project like DynInst backed away from this approach because of various side effects. E.g., when the build system already has a different version of boost and some dependent components built against it, this could lead to extremely hard-to-debug problems. If boost libraries are involved, similar version mismatch issues are also there... Back porting or needing to reintegrate the whole boost just to pick up some upstream fixes were also a big issue...

For this reason, I decided not to distribute boost as part of my package. But nevertheless I integrated this into the code base only to make building launchmon easy on a system without installed boost. IOW, on a system with boost installed, I just use the system-provided one.

trws commented 9 years ago

Those all sound like reasonable choices. The masks and floating point regs work that way by default, the arguments we can adjust to our liking, I just don't know what the current callbacks use.

The boost dependency would be boost_coroutine and boost_system 1.55+, no other boost libraries would be required.

Alternately, there are a number of C libraries that offer the same or similar functionality, but I can't seem to find one that is being actively maintained. The boost version has the advantage that the authors are actively pitching it for inclusion in the C++ standard, and it's being used to support other libraries in the suite, so it's active and likely to stay that way. The Intel Cilk+ runtime and qthreads from sandia, as well as essentially all other green-threads implementations, contain some equivalent functionality to this in order to deal with context saving and switching, so they could all be options as well. Of these, the most open to us and most well maintained cross-platform would probably be qthreads, but as with cilk+ it's hard to divide the context switching functionality from the underlying task scheduler.

Since what we're talking about is creating a shim anyway, I don't see any reason we couldn't change out the underlying implementation to one of the C libraries or something else entirely if the dependency is sufficiently onerous.

On 29 Jan 2015, at 15:19, Jim Garlick wrote:

Well if we want to keep it simple to start, and try to confine changes to the RPC and reactor code, then I would say it makes sense to just run handlers as coroutines, have them only yield to the reactor, and have them only resume via a callback arrangement. Signal masks could be ignored. Floating point regs should probably be preserved? The args in/args out for a coroutine should map some way onto the prototypes for reactor callbacks.

I'm worried about the need to link against boost. Assuming the approach plays out as a good one, do you know if it is feasible to extract the coroutine/context bits and just pull them into the project?


Reply to this email directly or view it on GitHub: https://github.com/flux-framework/flux-core/issues/126#issuecomment-72124575

trws commented 9 years ago

Had a chat with Stephen Olivier today from Sandia, he confirmed that qthreads has a mechanism like this internally, which is written in C. An alternative version of what I posted before, using that interface, is below. It seems to be the most recently contributed C library version I've seen, but the license situation is a bit odd, it looks like a pile of BSD and MIT-like licenses that have propagated over the past decade or so. Anyway, it does or can do basically everything the boost version does, although it's a bit harder to follow, and will require more pre-processor directives to special-case for pointer and int size for general portability.

#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <stdbool.h>

#include "include/qthread/qthread.h"
#include "include/qt_context.h"

#define S_SIZE (5*1024)

static qt_context_t main_ctx;
qt_context_t bar_ctx;
typedef enum coroutine_status {
  COROUTINE_FINISHED,
  COROUTINE_IN_PROGRESS
} coroutine_status;

typedef union {
  struct {
    int part1;
    int part2;
  };
  void* complete;
} splitter;

struct coroutine;

typedef void (*coroutine_func)(struct coroutine*);

typedef struct coroutine {
  void* data;
  coroutine_status status;
  qt_context_t ctx;
  qt_context_t return_to;
  bool allocated;
  coroutine_func target;
} coroutine;

coroutine * foo_coro;
coroutine * bar_coro;

void ctx_init(qt_context_t * ctx) {
  ctx->uc_stack.ss_size = S_SIZE;
  ctx->uc_stack.ss_sp = malloc(S_SIZE);
}
qt_context_t* ctx_new() {
  qt_context_t* ret = (qt_context_t*)malloc(sizeof(qt_context_t));
  ctx_init(ret);
  return ret;
}

void ctx_destroy(qt_context_t *ctx) {
  free(ctx->uc_stack.ss_sp);
  ctx->uc_stack.ss_sp = NULL;
}

void wrapper(coroutine * c) {
  // Re-merge the pointer to the coroutine
  fprintf(stderr, "pointer in wrapper: %p, %x, %x\n", c, c, c);

  c->status = COROUTINE_IN_PROGRESS;
  c->target(c);

  c->status = COROUTINE_FINISHED;
  setcontext(&c->return_to);
}

void init_coroutine(coroutine *c) {
  memset(c, 0, sizeof(coroutine));
  ctx_init(&c->ctx);
}

coroutine* new_coroutine() {
  coroutine * c = (coroutine*)malloc(sizeof(coroutine));
  init_coroutine(c);
  return c;
}

void destroy_coroutine(coroutine *c) {
  ctx_destroy(&c->ctx);
}

coroutine* make_coroutine(coroutine_func f) {
  coroutine* c = new_coroutine();
  splitter s = {.complete = c};
  fprintf(stderr, "pointer in make: %p, %p, %x, %x, %lu\n", c, s.complete, s.part1, s.part2, sizeof(int));
  getcontext(&c->return_to);
  qt_makectxt(&c->ctx, (void(*)())&wrapper, 1, c);
  c->target = f;
  return c;
}

void coroutine_set_return(coroutine * c){
  getcontext(&c->return_to);
}

coroutine_status call_coroutine(coroutine*c, void* data) {
  c->data = data;
  qt_swapctxt(&c->return_to, &c->ctx);
  return c->status;
}

void coroutine_yield(coroutine * c) {
  qt_swapctxt(&c->ctx, &c->return_to);
}

void foo(coroutine * c) {
  fprintf(stderr, "In foo\n");
  coroutine_yield(c);
  fprintf(stderr, "back in foo\n");
}

void bar(coroutine * c) {
  fprintf(stderr, "In bar\n");
  coroutine_yield(c);
  fprintf(stderr, "back in bar\n");
}

int main(int argc, char *argv[]) {

  fprintf(stderr, "making coroutines\n");
  foo_coro = make_coroutine(&foo);
  bar_coro = make_coroutine(&bar);

  call_coroutine(foo_coro, NULL);
  call_coroutine(bar_coro, NULL);
  call_coroutine(foo_coro, NULL);
  call_coroutine(bar_coro, NULL);

  fprintf(stderr, "done\n");

  return 0;
}

Output:

making coroutines
pointer in make: 0x100103a50, 0x100103a50, 103a50, 1, 4
pointer in make: 0x100103b40, 0x100103b40, 103b40, 1, 4
pointer in wrapper: 0x100103a50, 103a50, 103a50
In foo
pointer in wrapper: 0x100103b40, 103b40, 103b40
In bar
back in foo
back in bar
done
garlick commented 9 years ago

What about starting with direct calls to makecontext() and friends? Looking at the example code in makecontext(3), this may be straightforward. qthreads/boost could be brought in later should they be needed to solve a real problem with the direct approach such as portability or performance?

A simple approach for one coroutine per handler, limiting handler "queue depth" to one:

This would allow all the methods implemented by a comms module to remain responsive when one of them is blocked on an RPC. Further work in this direction could enable multiple contexts per handler for a configurable request queue depth per method.

By the way, I think our bindings for other languages that have more sophisticated concurrency support like lua or go should not be required to use this. They should have their own native reactors and RPC implementations and whatever else is needed to do the job natively. I'm thinking of this as a bandaid on the C binding.

trws commented 9 years ago

It would basically be a bandaid for C, except that in all of those other languages the coroutines are implemented this way, or one of the other ways in this thread, underneath anyway (except erlang, and some lisp variants... they just don't have stacks). We can certainly use makecontext/getcontext/swapcontext/setcontext for this, it will be almost identical to the qthreads one except that it needs a bit of extra logic to chop the argument pointer into two signed integers and re-assemble them for some strange reason, assign the uc_link pointer to the return context, and parametrize the argument handling. I shied away from that initially for these reasons:

1 it has been officially removed from the posix standard (though it still works on linux, and likely will for the forseeable future, other unices are actively deprecating it) 2 It is slower than implementing it externally because POSIX requires it to save signal information 3 Unlike boost, there is no support for using segmented stacks to deal with the stack-size determination problem. You get what you allocate, and that's it. Admittedly, the qthreads version has this same issue.

None of these is a showstopper, and the availability of these is definitely higher with linux as the target platform.

grondo commented 9 years ago

What about abstracting necessary functions behind a simple API, and implement that API with makecontext() and friends at first, extending with boost || qthreads when need arises (possibly this is what @garlick was suggesting, sorry for being redundant)

My one fear about this is making the API more tricky for the user -- I hope this stuff will be hidden behind the reactor API..

Also lua doesn't have a native reactor per se, just bindings to the C reactor. rewriting the entire flux API "natively" in Lua would be a pretty big job...

trws commented 9 years ago

I agree on all points. We would probably want a coroutine abstraction anyway, especially since makecontext and friends are very complicated to use correctly.

Ideally we would hide it from the user entirely, but if they want to use a blocking call we don't know about, then giving them access to a generic yield or similar may be beneficial.

garlick commented 9 years ago

Yep we can create a simple API for this that could switch the underlying implementation and be independently tested.

Thanks for enumerating those gotchas on makecontext() @trws. I'll have a closer look and see which of the proposed methods will be easiest to try first. The fact that you have done all this investigation has really cut down the overhead needed to get the idea off the ground.

Regarding yield() and API complication, for a start I would like to focus the problem of blocking RPC calls made from reactor handlers, limit concurrency as proposed, and hide everything. It sounds like that can be done with minimal disruption to API's.

garlick commented 9 years ago

On language bindings, I just wanted to be careful in my parochial view not to let C's lack of a concurrency model limit what might be possible in languages that do have interesting models. Please keep me in line!

garlick commented 9 years ago

I raised the following questions in the above PR, repeated here along with some references and ideas discussed in today's meeting:

@trws mentioned Rust is dropping segmented stack support.

@trws mentioned compiler features new in gcc4/clang3.6 are required to implement segmented stacks. He also alluded to a proposed kernel "run this thread next" feature that would obviate the need for "green threads" and cited this Google presentation

@trws suggested mprotect.

@dongahn suggested watchpoints

This should clearly be fixed. I was waiting until we've decided what public reactor interface changes we want as that might change how we would implement this.

dongahn commented 9 years ago

If one can live with page granularity, the mprotect sheme that Tom suggested would be a much more established way. For a system like BGQ, they use hardware watchpoints to work around various system limitations. Please refer to section 5.3 of http://www.redbooks.ibm.com/redbooks/pdfs/sg247287.pdf

trws commented 9 years ago

That seems like a good rundown @garlick, and your PR looks really good!

This is just my 2c, but I think you've got what we really need, and that the stack issue can safely be punted as an optimization opportunity to be addressed later. Since we're on 64-bit platforms, I would just gratuitously over-allocate the stack space by a significant amount, as in use the pthreads default of 2mb or more, and avoid initializing it. As long as the extra pages don't get touched, they wont consume physical memory anyway until the stack grows far enough to start using each page. Then mprotect becomes practical as well.

As general as your interface is, you've prepared it such that we always have the option to replace the implementation under the covers with a self-extending stack mechanism or a standard thread or whatever we like. If the stack space becomes an issue, it can be limited, controlled, or in principle just replaced with a stackless coroutine mechanism, though that last would be particularly painful.

garlick commented 9 years ago

Thanks guys!

I will implement the larger default stack size and the running of other reactor handler types (fd, zsock, timer) as coprocs in the short term.

garlick commented 9 years ago

I'm going to go ahead and close this issue. It's noted in the coproc source in case we need to look at other options down the road.