zeromq / majordomo

Majordomo Project
Mozilla Public License 2.0
115 stars 52 forks source link

"Fast requests" accumulate in broker queue and are never dispatched #73

Open JoergSchneiderSimon opened 4 years ago

JoergSchneiderSimon commented 4 years ago

if clients send requests faster than workers can handle them, some of the requests are never dispatched to the worker, because s_dispatch() is only called from handle_ready() and handle_request(). What appears to be missing is a call to s_dispatch() at the end of handle_final() in mdp_broker.c

To reproduce, run attached broker, worker and client. then create multiple requests (say, 5 requests) with the client before handling them in the worker. Only the first one will be processed by the client. then create a new request in the client and handle a request in the worker. Now request number 2 will be handled, etc.

mdp_broker_issue_demo.tar.gz

so IMHO, handle_final() should look like this (added a call to s_dispatch() after re-adding a worker as waiting):

static void
handle_worker_final (client_t *self)
{
    mdp_msg_t *msg = self->message;
    mdp_msg_t *client_msg = mdp_msg_new();
    // Set routing id, messageid, service, body
    zframe_t *address = mdp_msg_address(msg);

    mdp_msg_set_routing_id(client_msg, address);
    mdp_msg_set_id(client_msg, MDP_MSG_CLIENT_FINAL);
    const char *service_name = self->service_name;
    mdp_msg_set_service(client_msg, service_name);
    zmsg_t *body = mdp_msg_get_body(msg);
    mdp_msg_set_body(client_msg, &body);
    mdp_msg_send(client_msg, self->server->router);

    // Add the worker back to the list of waiting workers.
    char *identity = zframe_strhex(mdp_msg_routing_id(msg));

    worker_t *worker =
        (worker_t *) zhash_lookup(self->server->workers, identity);
    assert(worker);
    zlist_append(self->server->waiting, worker);
    service_t *service = (service_t *) zhash_lookup(self->server->services,
        worker->service->name);
    assert(service);
    zlist_append(service->waiting, worker);

    zstr_free(&identity);
    mdp_msg_destroy(&client_msg);
    s_service_dispatch(service);
}

cheers,

Joerg