uclasystem / dorylus

Dorylus: Affordable, Scalable, and Accurate GNN Training
77 stars 12 forks source link

Invalid message sizes #7

Open sarda-devesh opened 10 months ago

sarda-devesh commented 10 months ago

Hello,

I have currently being trying to getting dorylus running and I am currently running into an issue of the lambda function timing out after 900 seconds. In order to resolve the cause of the bug, I modified src/funcs/gcn/ops/network_ops.cpp to add in some logging like this:

...
int recvTensor(zmq::socket_t& socket, Matrix &mat) {
    zmq::message_t tensorHeader(TENSOR_HDR_SIZE);
    zmq::message_t tensorData;

    std::cout << "Calling recv for tensor header of size " << TENSOR_HDR_SIZE << std::endl;
    if (!socket.recv(&tensorHeader)) {
        return 0;
    }
    std::cout << "Received tensor header" << std::endl;

    unsigned resp = parse<unsigned>((char*)tensorHeader.data(), 0);
    if (resp == ERR_HEADER_FIELD) {
        std::cerr << "Got error from server. Consult graph server output" << std::endl;
        return -1;
    }
    std::string name = parseName((char*)tensorHeader.data());

    std::cout << "Calling receive for tensor data" << std::endl;
    if (!socket.recv(&tensorData)) {
        return 0;
    }
    std::cout << "Received tensor data" << std::endl;

    unsigned rows = parse<unsigned>((char*)tensorHeader.data(), 3);
    unsigned cols = parse<unsigned>((char*)tensorHeader.data(), 4);

    FeatType* data = new FeatType[rows * cols];
    std::memcpy(data, tensorData.data(), tensorData.size());

    mat.setName(name.c_str());
    mat.setRows(rows);
    mat.setCols(cols);
    mat.setData(data);

    return 0;
}

void logabbleHeader(void* header, unsigned op, Chunk &chunk) {
    char *ptr = (char *)header;
    memcpy(ptr, &op, sizeof(unsigned));
    std::cout << "Sending data of " << op << " of size " << sizeof(unsigned) << std::endl;
    memcpy(ptr + sizeof(unsigned), &chunk, sizeof(chunk));
    std::cout << "Size of chunk is " << sizeof(chunk) << std::endl;
}

std::vector<Matrix> reqTensors(zmq::socket_t& socket, Chunk &chunk, std::vector<std::string>& tensorRequests) {

#define INIT_PERIOD (5 * 1000u) // 5ms
#define MAX_PERIOD (500 * 1000u)
#define EXP_FACTOR 1.5

    unsigned sleepPeriod = INIT_PERIOD;
    bool empty = true;
    std::vector<Matrix> matrices;
    while (true) {
        zmq::message_t header(HEADER_SIZE);
        logabbleHeader(header.data(), OP::PULL, chunk);
        std::cout << "Sending header of size " << HEADER_SIZE << std::endl;
        socket.send(header, ZMQ_SNDMORE);
        std::cout << "Socket sent header" << std::endl;

        unsigned numTensors = tensorRequests.size();
        std::cout << "Got numTensors of " << numTensors << std::endl;

        for (unsigned u = 0; u < tensorRequests.size(); ++u) {
            std::string& name = tensorRequests[u];
            zmq::message_t tensorHeader(TENSOR_HDR_SIZE);
            populateHeader(tensorHeader.data(), chunk.localId, name.c_str());
            std::cout << "Populated tensor header of size " << TENSOR_HDR_SIZE << std::endl;

            if (u < numTensors - 1) {
                socket.send(tensorHeader, ZMQ_SNDMORE);
            } else {
                socket.send(tensorHeader);
            }
            std::cout << "Sent tensor header " << u << std::endl;
        }

        unsigned more = 1;
        empty = false;
        while (more && !empty) {
            Matrix result;
            std::cout << "Calling recv tensor" << std::endl;
            int ret = recvTensor(socket, result);
            std::cout << "recvTensor returned val of " << ret << std::endl;

            if (ret == -1) {
                for (auto& M : matrices) deleteMatrix(M);
                matrices.clear();
                return matrices;
            }
            if (result.empty()) {
                empty = result.empty();

                for (auto& M : matrices) deleteMatrix(M);
                matrices.clear();
                size_t usize = sizeof(more);
                socket.getsockopt(ZMQ_RCVMORE, &more, &usize);
            } else {
                matrices.push_back(result);

                size_t usize = sizeof(more);
                socket.getsockopt(ZMQ_RCVMORE, &more, &usize);
            }
        }

        if (RESEND && empty) {
            usleep(sleepPeriod);
            sleepPeriod *= EXP_FACTOR;
            sleepPeriod = std::min(sleepPeriod, MAX_PERIOD);
        } else {
            break;
        }
    }

    return matrices;

#undef INIT_PERIOD
#undef MAX_PERIOD
#undef EXP_FACTOR
}
...

with the rest of the gcn lambda function being the same. When I look at the CloudWatch Log I get the following output:

image

Taking a look at the graph-server src code, I issue that the issue is in src/graph-server/commmanager/lambdaworker.cpp, specifically the following checks in LambdaWorker::work are failing:

// recv will return false if timed out.
            if (!workersocket.recv(&identity)) {
                continue;
            }
            if (identity.size() != IDENTITY_SIZE) {
                printLog(manager->nodeId, "identity size %u", identity.size());
                continue;
            }
            if (!workersocket.recv(&header)) {
                continue;
            }
            if (header.size() != HEADER_SIZE) {
                printLog(manager->nodeId, "header size %u", header.size());
                continue;
            }

I added in the following log statements in src/graph-server/main.cpp of:

const unsigned IDENTITY_SIZE = sizeof(Chunk) + sizeof(unsigned);
    printLog(engine.getNodeId(), "Chunk of size %d, unsigned of size %d, identity of size %d, and header of size %d", sizeof(Chunk), sizeof(unsigned), IDENTITY_SIZE, HEADER_SIZE); 

and get the output of:

[ Node   0 ]  Chunk of size 32, unsigned of size 4, identity of size 36, and header of size 36

This means that graph server is expecting two messages of size 36 and 36 while the lambda function is sending one packet of size 36 and another packet of 28 which is causing this issue.

I see that header size (which is what the graph server) is defined as:

#define HEADER_SIZE (sizeof(unsigned) + sizeof(Chunk))

and chunk has the following definition:

struct Chunk {
    unsigned localId;
    unsigned globalId;
    unsigned lowBound;
    unsigned upBound;
    unsigned layer;
    PROP_TYPE dir;
    unsigned epoch;
    bool vertex;
    ...
};

On the other hand, the lambda functions sends a packet of size TENSOR_HDR_SIZE which is defined as:

static const size_t TENSOR_HDR_SIZE = sizeof(unsigned) * 5 + TENSOR_NAME_SIZE;

and it gets populated as such:

populateHeader(tensorHeader.data(), chunk.localId, name.c_str());

which I am assuming calls the following function:

static inline void
populateHeader(void* header, unsigned op, const char* tensorName, unsigned field1 = 0,
  unsigned field2 = 0, unsigned field3 = 0, unsigned field4 = 0) {
    char* data = (char*)header;
    serialize<unsigned>(data, 0, op);
    std::memcpy(data + sizeof(unsigned), tensorName, TENSOR_NAME_SIZE);
    serialize<unsigned>(data, 3, field1);
    serialize<unsigned>(data, 4, field2);
    serialize<unsigned>(data, 5, field3);
    serialize<unsigned>(data, 6, field4);
}

Would appreciate some advice on how to resolve this issue