flux-framework / flux-core

core services for the Flux resource management framework
GNU Lesser General Public License v3.0
167 stars 50 forks source link

flux_event_unsubscribe #505

Closed lipari closed 8 years ago

lipari commented 8 years ago

There is either something not working with flux_event_unsubscribe() or it is being used incorrectly. I am debugging a sched problem that has arisen in the latest flux-core. The one thing that seems strange to me his that the unsubscribe in the snippet from flux-waitjob.c is not working. What I see are repeated callbacks to sync_event_cb() after the first callback is received and the "hb" is unsubscribed from. I even placed a one second sleep to be sure it wasn't due to a delay in processing the unsubscribe. But still sync_event_cb() keeps getting called:

static void sync_event_cb (flux_t h, flux_msg_handler_t *w,
                           const flux_msg_t *msg, void *arg)
{
    wjctx_t *ctx = getctx (h);

    if (ctx->start)
        create_outfile (ctx->start);

    if (flux_event_unsubscribe (h, "hb") < 0) {
        flux_log (h, LOG_ERR, "%s: flux_event_unsubscribe hb: %s",
                 __FUNCTION__, strerror (errno));
    }
    if (jsc_notify_status (h, waitjob_cb, (void *)h) != 0) {
        flux_log (h, LOG_ERR, "failed to reg a waitjob CB");
    }
    if (complete_job (ctx)) {
        if (ctx->complete)
            create_outfile (ctx->complete);
        flux_log (ctx->h, LOG_INFO, "sync_event_cb: completion detected");
    }
    return;
}

static struct flux_msg_handler_spec htab[] = {
    { FLUX_MSGTYPE_EVENT,     "hb", sync_event_cb},
    FLUX_MSGHANDLER_TABLE_END
};
garlick commented 8 years ago

Is there a balanced number of subscribes and unsubscribes? If you subscribe twice you have to unsubscribe twice to stop receiving the event.

lipari commented 8 years ago

Yes, I considered that. From what I read in the code, and from the diagnostics I've inserted, there is only one subscribe.

garlick commented 8 years ago

I am on the road now but will double check the code later today. Unsubscribe is not used that much so I could imagine it might be broken without it being obvious. On Dec 21, 2015 2:23 PM, "Don Lipari" notifications@github.com wrote:

Yes, I considered that. From what I read in the code, and from the diagnostics I've inserted, there is only one subscribe.

— Reply to this email directly or view it on GitHub https://github.com/flux-framework/flux-core/issues/505#issuecomment-166440137 .

lipari commented 8 years ago

Ok, thanks. I'll still try and triangulate if and where it broke.

grondo commented 8 years ago

@lipari, if you have a standalone simple reproducer we can git bisect (and add a test to testsuite)

lipari commented 8 years ago

Ok. I'm searching for the good builds now.

lipari commented 8 years ago

Ok, the reproducer is flux -x . waitjob -s hello -c there 1 run in flux-sched/sched directory. It is basically looking to wait for job 1 to appear.

I found a good flux-core build: bcb62be6c7691e58a7fbfbdcd5c1f02a3377b3f3. This was when PR 496 was merged.

In the good flux-core and after invoking the reproducer above, flux waitjob's sync_event_cb() is called once. In the current flux-core build, it is called repeatedly.

lipari commented 8 years ago

@grondo or @garlick, I will attempt to identify exactly which commit caused the problem. Just want to make sure you are not already doing the same.

grondo commented 8 years ago

I wasn't looking into it just yet, no.

garlick commented 8 years ago

I am still away from a computer but will try to follow up later this evening. On Dec 21, 2015 4:06 PM, "Mark Grondona" notifications@github.com wrote:

I wasn't looking into it just yet, no.

— Reply to this email directly or view it on GitHub https://github.com/flux-framework/flux-core/issues/505#issuecomment-166463521 .

dongahn commented 8 years ago

FYI -- There was a recent related question about when messages queue. W/ that clarification, sync_event_cb can go away completely.

Is there a balanced number of subscribes and unsubscribes?

@garlick: could you tell me more about this? The point of this handler was get a heartbeat event handler invoked once and then flux_event_unsubscribe (). And in the jsc_notify_status, jsc issues a couple of new flux_event_subscribe -- on different job events. Can this be problematic?

grondo commented 8 years ago

@dongahn, can you explain perhaps more rigorously the logic being used here? e.g. are you using the heartbeat event handler as a way to get a callback invoked "under the reactor" to register other event handlers? Once the other events are subscribed, you then unsubscribe from the heartbeat handler?

grondo commented 8 years ago

Is there a balanced number of subscribes and unsubscribes?

@garlick: could you tell me more about this?

I think @garlick just meant that if you subscribe to an event twice, you must also unsubscribe twice.

dongahn commented 8 years ago

Yes. This was from my incorrect assumption described in #498

dongahn commented 8 years ago

Could this be the case where the heartbeat events are queued up before seeing flux_unscribe_event () and the callbacks are called that many times?

lipari commented 8 years ago

Ok, it looks like the unsubscribe broke in commit a4ba490. The commit prior to that, 5ab4e7b works fine.

grondo commented 8 years ago

@lipari, nice work! Thanks!

grondo commented 8 years ago

Since the change in a4ba490 was ostensibly to call all matching event handlers instead of only the first match, could the problem here be that waitjob is indeed subscribing multiple times to "matching" events?

dongahn commented 8 years ago

@grondo: I looked at the code once again, I couldn't find multiple matching. I can use a second pair of eyes though:

[flux-waitjob.c)[flux-https://github.com/flux-framework/flux-sched/blob/master/sched/flux-waitjob.c]

JSC also uses flux_subscribe_event for job events and jsc events.

But if the heartbeack callback is invoked multiple times, you will end up subscribing job and jsc events.

grondo commented 8 years ago

Once @garlick has a chance to look he may be able to find the problem a lot more quickly than I. I was looking at flux-waitjob and could not initially see any problem either, but I am not familiar with JSC enough and quickly got lost on waitjob events subscribe vs JSC events subscribe... I'll look more later, but it would be really helpful (and useful for testing) if we could boil this one down to a standalone reproducer code (with single .c or python script or Lua script)...

dongahn commented 8 years ago

@lipari and @garlick: if the hearbeat callbacks are indeed called multiple times because of the queued up heartbeats between the subscription point and un-subscription point within the first callback invocation, what should be the proper semantics? My guess is just to still invoke the callbacks because their events have happened before the subscription point?

lipari commented 8 years ago

What I've found is that if I place a return midway through the function:

static void sync_event_cb (flux_t h, flux_msg_handler_t *w,
                           const flux_msg_t *msg, void *arg)
{
    wjctx_t *ctx = getctx (h);

    if (ctx->start)
        create_outfile (ctx->start);

    if (flux_event_unsubscribe (h, "hb") < 0) {
        flux_log (h, LOG_ERR, "%s: flux_event_unsubscribe hb: %s",
                 __FUNCTION__, strerror (errno));
    }
return;
    if (jsc_notify_status (h, waitjob_cb, (void *)h) != 0) {
        flux_log (h, LOG_ERR, "failed to reg a waitjob CB");
    }
    if (complete_job (ctx)) {
        if (ctx->complete)
            create_outfile (ctx->complete);
        flux_log (ctx->h, LOG_INFO, "sync_event_cb: completion detected");
    }
    return;
}

then the continual sync_event_cb() call problem stops. If you look at the jsc_notify_status_obj() function in modules/libjsc/jstatctl.c, you'll see that it subscribes to different events of its own.

lipari commented 8 years ago

...and if I move the return statement to just before complete_job(), the problem reappears. So, it must be related to those two flux_event_subscribe()'s in jsc_notify_status_obj().

dongahn commented 8 years ago

Can this be a reproducer?

#include <stdio.h>
#include <getopt.h>
#include <errno.h>
#include <czmq.h>
#include <json.h>
#include <flux/core.h>

#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/jsonutil.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"

void testev_cb (flux_t h, flux_msg_handler_t *w,
                       const flux_msg_t *msg, void *arg)

{   
    flux_log (h, LOG_INFO, "callback called");
    if (flux_event_unsubscribe (h, "hb") < 0) {
        flux_log (h, LOG_ERR, "%s: flux_event_unsubscribe hb: %s",
                 __FUNCTION__, strerror (errno));
    }
}

static struct flux_msg_handler_spec htab[] = {
    { FLUX_MSGTYPE_EVENT, "hb", testev_cb},
      FLUX_MSGHANDLER_TABLE_END
};

int main ()
{
    flux_t h;

    h = flux_open (NULL, 0);

    flux_event_subscribe (h, "hb");
    if (flux_msg_handler_addvec (h, htab, (void *)h) < 0) {
        flux_log (h, LOG_ERR, "registering resource event handler: %s",
                  strerror (errno));
        return -1;
    }

    //Use sleep first
    //Please also try the stop with a debugger
    sleep (10);

    flux_log (h, LOG_INFO, "woke up");

    flux_reactor_start (h);
    flux_close (h);

    return 0;
}
cab670{dahn}44: /nfs/tmp2/dahn/FLUXDEV/flux-core/src/cmd/flux -v -M/nfs/tmp2/dahn/FLUXDEV/flux-sched/sched -C/nfs/tmp2/dahn/FLUXDEV/flux-sched/rdl/\?.so -L/nfs/tmp2/dahn/FLUXDEV/flux-sched/rdl/\?.lua start --size 2

cab670{dahn}21: flux -x /nfs/tmp2/dahn/FLUXDEV/flux-sched/sched test &
[1] 124494
cab670{dahn}22: [1450745632.106689] unknown.info[0]: woke up
[1450745632.107160] unknown.info[0]: callback called
[1450745632.107411] unknown.info[0]: callback called
[1450745632.107502] unknown.info[0]: callback called
[1450745632.107582] unknown.info[0]: callback called
[1450745632.107680] unknown.info[0]: callback called

It seems the callback is invoked 5 times. But as I said above, I am not sure if this is a problem or the way that the eventing should work.

Also, I'm curious why the recent commit changed the behavior.

dongahn commented 8 years ago

FWIW: the jsc code where the event subscription is happening is the following.

static struct flux_msg_handler_spec htab[] = {
    { FLUX_MSGTYPE_EVENT,     "wreck.state.*", job_state_cb},
    { FLUX_MSGTYPE_EVENT,     "jsc.state.*",   job_state_cb},
      FLUX_MSGHANDLER_TABLE_END
};

int jsc_notify_status_obj (flux_t h, jsc_handler_obj_f func, void *d)
{
    int rc = -1;
    cb_pair_t *c = NULL; 
    jscctx_t *ctx = NULL; 

    if (flux_event_subscribe (h, "wreck.state.") < 0) {
        flux_log (h, LOG_ERR, "subscribing to job event: %s", 
                     strerror (errno));
        rc = -1;
        goto done;
    }
    if (flux_event_subscribe (h, "jsc.state.") < 0) {
        flux_log (h, LOG_ERR, "subscribing to job event: %s", 
                     strerror (errno));
        rc = -1;
        goto done;
    }
    if (flux_msg_handler_addvec (h, htab, (void *)ctx) < 0) {
        flux_log (h, LOG_ERR, "registering resource event handler: %s",
                  strerror (errno));
        rc = -1;
        goto done;
    }
dongahn commented 8 years ago

If the queued-up heartbeat events happen to be delivered to waitjob and end up invoking the callback multiple time, jsc will end up subscribing to he events multiple times -- this may be related to the failure more @lipari is seeing?

garlick commented 8 years ago

@dongahn's reproducer above reproduces the expected behavior. Events are queued on the flux_t handle between subscribe and starting the reactor. The unsubscribe in the handler stops future events from being queued, but the ones in the queue still result in a callback.

garlick commented 8 years ago

Also, I'm curious why the recent commit changed the behavior.

We can verify in case something unintended is happening, but it should not have changed the behavior of the reproducer. The commit found by the bisection just ensures that if multiple handlers match a single event message on a given handle, they each get a copy, rather than only the first match.

dongahn commented 8 years ago

@garlick:

Thanks. I am not 100% sure if my reproducer reproduces what @lipari saw, but this is certainly helpful!

It seems the lesson here is, make any event callback "queued-events safe"? With this event queueing semantics, an event callback can always be invoked way after flux_unsubscribe_event of that event returns through queueing, which isn't something that I could immediately take into account for the initial coding. Perhaps we can document this somewhere...

I will change waitjob.c to remove the heartbeat callback entirely; hopefully this will fix @lipari's repro as well.

grondo commented 8 years ago

@dongahn: Maybe a good way to document would be that "an event subscriber will receive all event messages that are generated between flux_event_subscribe and matching flux_event_unsubscribe on the event emitting broker (currently rank 0)"

This not only covers the case of queued events, but also covers event messages that may be in flight between when unsubscribe msg is sent from the caller and received by the event emitter. (?)

dongahn commented 8 years ago

Sounds reasonable to me. Perhaps we can augment a manpage? To be clear, the current behavior is okay as far as people like me understand and take it semantics into account for safer coding :-)

grondo commented 8 years ago

I actually think I wrote before thinking above -- I'm not sure the wording applies exactly as I said. However, adding to the manpage is a great idea, that is exactly where this belongs. I'll bet @garlick will come up with much more precise language than I did above. (I confused myself for a bit on how event subscription works exactly.)

dongahn commented 8 years ago

Relying on @garlick sounds like the best idea.

If applicable, I wouldn't be shy away form mentioning some mechanisms like "event queue" in the man page for clarity-- e.g., "flux_unsubscribe_event does not delete events from the event queue" and thus...

lipari commented 8 years ago

I concur that there are two issues with somewhat overlapping behavior. One is the behavior exhibited by @dongahn 's reproducer. I think we all understand that phenomenon. But the problem I observed seems more like cross-talk in the event registration. The event subscriptions in the jsc code are cancelling out the unsubscribe of the hb event in the flux waitjob code.

grondo commented 8 years ago

My 'rank 0' statement above was because I was confusing subscribe with publish (sorry about that).

If you ignore my statements about "event emitter" then maybe it makes more sense.

I'm not sure there is an "event queue" so maybe just saying that "event messages will be delivered to the handle between subscribe and unsubscribe" as @garlick said above? (I actually missed his statement above which seems pretty clear -- I fear I just confused things with my misstatement above)

Somewhere there perhaps needs to be a reminder that messages in general are "queued" in the handle and only delivered to callbacks during reactor_run or via flux_recv?

grondo commented 8 years ago

@lipari -- do we yet understand why the problem occurs only after commit a4ba490? (Sorry if I just missed it -- I'd still like to develop a standalone test that illustrates the broken behavior, unless we've concluded it was a hidden bug uncovered by the referenced commit?)

lipari commented 8 years ago

Ok, tomorrow I'll work on creating a standalone test that demo's the scenario I've observed. I'll start with @dongahn's test above and just add a couple more flux_event_subscribes() of different events and see whether that causes the "hb" unsubscription to be compromised.

garlick commented 8 years ago

It definitely seems worth warning that events can be received after you have unsubscribed to them, as that seems counter-intuitive. The two situations when that can occur are 1) when events are already queued in the handle when the unsubscribe request is processed upstream, and 2) if the event matches another subscription topic.

lipari commented 8 years ago

@garlick, I understand 1) but am confused on 2). If I subscribe to the "hb" event, unsubscribe from the "hb" event, and then subscribe to the "wreck.state" and "jsc.state" events, "hb" doesn't match "wreck.state" or "jsc.state". So I should eventually stop getting callbacks for the "hb" event, once the events queued in the handle are depleted. Correct?

garlick commented 8 years ago

@lipari: correct, I was just listing the other way you might get an event after you unsubscribe to it, e.g.

flux_event_subscribe ("foo.bar");
flux_event_subscribe ("foo.bar.baz");
flux_event_unsubscribe ("foo.bar.baz");
// events matching foo.bar (including foo.bar.baz) will still be received
// same deal if both subscription strings were identical

I was wondering what is the reason for subscribing to the heartbeats, then in the heartbeat handler, subscribing to other events? Why not skip the heartbeat and subscribe to the other events right off?

dongahn commented 8 years ago

@garlick: that will be done as I mentioned above. This was from the incorrect assumption @grondo and I described the other day.

garlick commented 8 years ago

Oh sorry, I thought that comment above referred to something else. Thanks.

dongahn commented 8 years ago

No problem. I am bailing out for now though. More fun for tomorrow.

grondo commented 8 years ago

Just playing around with reproducer, the following code (in Lua because it is quick) works as expected. Removing the sleep (2) results in a single hb event being delivered within the 5s timeout, otherwise the the 2nd hb event is queued while we're sleeping in the msg handler

#!/usr/bin/lua
local flux = require 'flux'
local sleep = require 'posix'.sleep
local f,err = flux.new ()
if not f then error (err) end

local event_count = 0

f:subscribe ("hb")
f:subscribe ("test")
f:msghandler {
    pattern = "hb",
    msgtypes = { flux.MSGTYPE_EVENT },
    handler = function (f, msg, mh)
        event_count = event_count + 1
        print (msg.tag, require 'inspect' (msg.data))
        -- wait for next hb event to be queued, then unsubscribe
        sleep (2)
        f:unsubscribe ("hb")
    end
}

-- Wait 5 seconds in reactor, then exit
f:timer {
    timeout = 5000,
    handler = function () f:reactor_stop () end
}
f:reactor ()
assert (event_count == 2)

-- vi: ts=4 sw=4 expandtab
$ lua ev.lua                                                                                                                                            
hb      {
  epoch = 1820
}
hb      {
  epoch = 1821
}
lipari commented 8 years ago

Ok, I've got a reproducer! And more info!

Place the following in flux-core/src/cmd:

$ cat src/cmd/flux-test.c
#include <stdio.h>
#include <errno.h>
#include <czmq.h>
#include <json.h>
#include <flux/core.h>
#include "src/common/libutil/log.h"

static void wreck_state_cb (flux_t h, flux_msg_handler_t *w,
                            const flux_msg_t *msg, void *arg)
{
    flux_log (h, LOG_INFO, "wreck_state_cb callback called");
}

static struct flux_msg_handler_spec htab2[] = {
    { FLUX_MSGTYPE_EVENT,     "wreck.state", wreck_state_cb},
    FLUX_MSGHANDLER_TABLE_END
};

void testev_cb (flux_t h, flux_msg_handler_t *w,
                const flux_msg_t *msg, void *arg)

{
    static int count = 0;
    flux_log (h, LOG_INFO, "testev_cb callback called %d", count);

    /* if (count++) */
    /*     return; */

    if (flux_event_unsubscribe (h, "hb") < 0) {
        flux_log (h, LOG_ERR, "unsubscribe hb: %s",
                  strerror (errno));
    }
    if (flux_event_subscribe (h, "wreck.state") < 0) {
        flux_log (h, LOG_ERR, "subscribing to wreck.state: %s",
                  strerror (errno));
    }
    if (flux_msg_handler_addvec (h, htab2, (void *)h) < 0) {
        flux_log (h, LOG_ERR, "registering htab2: %s",
                  strerror (errno));
    }
}

static struct flux_msg_handler_spec htab1[] = {
    { FLUX_MSGTYPE_EVENT, "hb", testev_cb},
      FLUX_MSGHANDLER_TABLE_END
};

int main (int argc, char *argv[])
{
    flux_t h = flux_open (NULL, 0);

    log_init ("flux-test");

    if (flux_event_subscribe (h, "hb") < 0) {
        flux_log (h, LOG_ERR, "subscribing to hb: %s",
                  strerror (errno));
    }

    if (flux_msg_handler_addvec (h, htab1, (void *)h) < 0) {
        flux_log (h, LOG_ERR, "registering htab1: %s",
                  strerror (errno));
        return -1;
    }

    flux_log (h, LOG_INFO, "starting reactor");
    flux_reactor_run (flux_get_reactor (h), 0);

    flux_close (h);

    return 0;
}

Build flux-test (I just added it to the Makefile) Now invoke the following:

src/cmd/flux start -s 1
src/cmd/flux test

and watch the fun begin:

[1450802870.607669] unknown.info[0]: starting reactor
[1450802870.899479] unknown.info[0]: testev_cb callback called 0
[1450802870.899789] unknown.info[0]: testev_cb callback called 0
[1450802870.899948] unknown.info[0]: testev_cb callback called 0
[1450802870.900102] unknown.info[0]: testev_cb callback called 0
[...]

Exit the flux instance, un-comment out these two lines, and build again.

    /* if (count++) */
    /*     return; */

Now run the two commands again. This time testev_cb is called only twice:

src/cmd/flux start -s 1
[1450802918.532222] broker.info[0]: nodeset: 0 (complete)
[1450802918.532305] broker.info[0]: starting initial program
src/cmd/flux test
[1450802933.205153] unknown.info[0]: starting reactor
[1450802934.359274] unknown.info[0]: testev_cb callback called 0
[1450802934.360015] unknown.info[0]: testev_cb callback called 1
grondo commented 8 years ago

Cool, Thanks!

If I add a fprintf (stderr, "msg=%p\n", msg) to testev_cb it looks like the same msg is being delivered to the callback over and over. I wonder if adding a watcher in the callback is causing the message to be requeued? (The message is obviously delivered immediately based on the posted timing, so these aren't "new" hb events...)

grondo commented 8 years ago

Through further debug, I've found that it is definitely adding the new msg_handler in a handler that is the cause of the issue. I've updated the Lua test to optionally add new msghandler in the "hb" callback and this causes the failure:

Working case without f:msghandler inside of "hb" handler:

$ ev.lua
hb      {
  epoch = 1095
}
hb      {
  epoch = 1096
}
calling reactor_stop()

With a msghandler added in the "hb" callback:

 $ lua ev.lua add_handler
dispatch_message m=0x1a414a0, handlers=1
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
hb      {
  epoch = 1082
}
^Clua: ev.lua:45: pcall: ev.lua:24: interrupted!
stack traceback:
        [C]: in function 'reactor'
        ev.lua:45: in main chunk
        [C]: ?

Note that

  1. This verifies the same message is being delivered to the callback (note the epoch is always the same)
  2. Other reactor events don't seem to be processed (I had to Ctrl-C out of the loop as the 5s timer event never fired)
garlick commented 8 years ago

That's a good clue, thanks guys! The dispatch code is walking a zlist of message handlers, and adding a message handler would modify the list mid-walk. The offending commit you found changed the behavior from breaking out of the loop on delivery to continuing on so that definitely fits with the theory.

grondo commented 8 years ago

Oh, good find @garlick.

garlick commented 8 years ago

I think this was fully addressed by pr #509, please reopen if not.