zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.62k stars 2.35k forks source link

why I should move the code zmq::message_t zmq_message into the while scope every sending message? #4534

Open fengmao31 opened 1 year ago

fengmao31 commented 1 year ago

I try to use zmq to transport capnproto serialized_message. but I get the drump crash.

#include "capnproto/message.capnp.h"
  #include <capnp/message.h>
  #include <capnp/serialize.h>
  #include <kj/std/iostream.h>
  #include <zmq.hpp>

int main() {

  // create a message builder
  capnp::MallocMessageBuilder message;
  Message::Builder messageBuilder = message.initRoot<Message>();

  // set the preparation and heading fields
  messageBuilder.setPreparation("prep");
  messageBuilder.setHeading("heading");

  zmq::context_t context(1);
  zmq::socket_t socket(context, ZMQ_PUB);

  socket.bind("tcp://127.0.0.1:5555");

  //  serialize the message to a memory buffer
  kj::Array<capnp::word> serialized_message =capnp::messageToFlatArray(message);
  auto byteArray = serialized_message.asBytes();

  // create a ZeroMQ message from the serialized buffer
  zmq::message_t zmq_message(byteArray);
  memcpy(zmq_message.data(), byteArray.begin(), byteArray.size());

  while (true) {
      socket.send(zmq_message);
  }

  return 0;
}
#include "capnproto/message.capnp.h"
#include <capnp/message.h>
#include <capnp/serialize.h>
#include <kj/std/iostream.h>
#include <zmq.hpp>

int main()
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_SUB);

    socket.connect("tcp://127.0.0.1:5555");  // or *
    socket.set(zmq::sockopt::subscribe, "");
    while (true) {
        zmq::message_t zmq_message;
        socket.recv(zmq_message);
        //if (reinterpret_cast<uintptr_t>(zmq_message.size()) % sizeof(capnp::word) == 0) {
            // message is aligned
            // create a memory buffer from the received message
            // kj::ArrayPtr<capnp::word> buffer(reinterpret_cast<capnp::word*>(zmq_message.data()),
            //                                  zmq_message.size() / sizeof(capnp::word));

            auto buffer = kj::heapArray<capnp::word>(zmq_message.size() / sizeof(capnp::word));
            memcpy(buffer.asBytes().begin(), zmq_message.data(), buffer.asBytes().size());  

            // create an input stream from the memory buffer
            capnp::FlatArrayMessageReader message_reader(buffer);
            Message::Reader message = message_reader.getRoot<Message>();

            // print the preparation and heading fields
            std::cout << "Preparation: " << message.getPreparation().cStr() << std::endl;
            std::cout << "Heading: " << message.getHeading().cStr() << std::endl;
        // } 
        // else {
        //     // message is not aligned
        //     std::cerr << "Agent: receive(): Not aligned " << std::endl;
        //}
    }
    return 0;
}
terminate called after throwing an instance of 'kj::ExceptionImpl'
  what():  capnp/message.c++:99: failed: expected segment != nullptr && segment->checkObject(segment->getStartPtr(), ONE * WORDS); Message did not contain a root pointer.
stack: 55efdad18f3a 55efdacf095f 55efdacefdac 7f6b183de082 55efdacefb2d
    ??:0: returning here
    ??:0: returning here
    ??:0: returning here
    ??:0: returning here
    ??:0: returning here

my code is copy from othe frst question and I try to add the anwer from second answer. I run the code from second question. it works.

https://stackoverflow.com/questions/74975616/zeromq-pub-sub-socket-receiving-error-of-a-serialized-message-object-using-capn

https://stackoverflow.com/questions/75006774/capn-proto-unaligned-data-error-while-trying-to-sendreceive-serialized-object

It has some problem in publisher code. forget to add the .size() should move the code zmq::message_t zmq_message into the while scope.

#include "capnproto/message.capnp.h"
  #include <capnp/message.h>
  #include <capnp/serialize.h>
  #include <kj/std/iostream.h>
  #include <zmq.hpp>

int main() {

  // create a message builder
  capnp::MallocMessageBuilder message;
  Message::Builder messageBuilder = message.initRoot<Message>();

  // set the preparation and heading fields
  messageBuilder.setPreparation("prep");
  messageBuilder.setHeading("heading");

  zmq::context_t context(1);
  zmq::socket_t socket(context, ZMQ_PUB);

  socket.bind("tcp://127.0.0.1:5555");

  //  serialize the message to a memory buffer
  kj::Array<capnp::word> serialized_message =capnp::messageToFlatArray(message);
  auto byteArray = serialized_message.asBytes();
  while (true) {
  // create a ZeroMQ message from the serialized buffer
    // zmq::message_t zmq_message(serialized_message.size() * sizeof(capnp::word));
    // memcpy((void*)zmq_message.data(), serialized_message.begin(), serialized_message.size() * sizeof(capnp::word));
    zmq::message_t zmq_message(byteArray.size());
  memcpy(zmq_message.data(), byteArray.begin(), byteArray.size());

      socket.send(zmq_message);
  }

  return 0;
}

It has some problem in publisher code. forget to add the .size() should move the code zmq::message_t zmq_message into the while scope.

#include "capnproto/message.capnp.h"
  #include <capnp/message.h>
  #include <capnp/serialize.h>
  #include <kj/std/iostream.h>
  #include <zmq.hpp>

int main() {

  // create a message builder
  capnp::MallocMessageBuilder message;
  Message::Builder messageBuilder = message.initRoot<Message>();

  // set the preparation and heading fields
  messageBuilder.setPreparation("prep");
  messageBuilder.setHeading("heading");

  zmq::context_t context(1);
  zmq::socket_t socket(context, ZMQ_PUB);

  socket.bind("tcp://127.0.0.1:5555");

  //  serialize the message to a memory buffer
  kj::Array<capnp::word> serialized_message =capnp::messageToFlatArray(message);
  auto byteArray = serialized_message.asBytes();
  while (true) {
  // create a ZeroMQ message from the serialized buffer
    // zmq::message_t zmq_message(serialized_message.size() * sizeof(capnp::word));
    // memcpy((void*)zmq_message.data(), serialized_message.begin(), serialized_message.size() * sizeof(capnp::word));
    zmq::message_t zmq_message(byteArray.size());
  memcpy(zmq_message.data(), byteArray.begin(), byteArray.size());

      socket.send(zmq_message);
  }

  return 0;
}

why I should move the code zmq::message_t zmq_message into the while scope?