Open fengmao31 opened 10 months ago
Can you provide full working example?
I will finish a least example and send to you next week. Maybe it is after Tuesday.
// based on https://www.codenong.com/45740168/
// based on https://blog.csdn.net/qq_41453285/article/details/106845900
// author samuel
#include "zmq.hpp"
#include <string>
#include <iostream>
#include "protobuf/examples.pb.h"
#include <unistd.h>
#include <thread>
#include <functional>
#include <memory>
#include "protobuf/msg_bridge_frame.pb.h"
using apollo::cyber::examples::proto::Chatter;
using apollo::msg_bridge::proto::Frame;
std::unique_ptr<zmq::context_t> context;
std::unique_ptr<zmq::socket_t> publisher;
class Apple {
private:
using ThreadPtr = std::unique_ptr<std::thread>;
ThreadPtr thread_;
;
public:
~Apple() { thread_->join(); }
void my_free(void* data, void* hint) {}
void Start() { thread_.reset(new std::thread(&Apple::ThreadFunc, this)); }
bool ThreadFunc()
{
std::unique_ptr<zmq::context_t> context;
std::unique_ptr<zmq::socket_t> publisher;
context = std::make_unique<zmq::context_t>(1);
publisher = std::make_unique<zmq::socket_t>(*context, ZMQ_PUB);
publisher->bind("tcp://127.0.0.1:8888");
std::this_thread::sleep_for(std::chrono::milliseconds(200));
uint64_t seq = 1;
while (true) {
// sleep(1);
seq++;
const std::string topic = "stream1";
zmq::message_t env(topic.data(), topic.size());
std::string info(100,'+');
std::string m(20000000,'+');
auto my_free3 = [](void* data, void* hint) {};
// char* aaa = (char*)malloc(info.size());
// memcpy(aaa, info.data(), info.size());
// std::cout<< aaa<<std::endl;
// std::string aaa_str(aaa,info.size());
// std::cout<< aaa_str<<std::endl;
// char* bbb = (char*)malloc(m.size());
// memcpy(bbb, m.data() , m.size());
zmq::message_t header(const_cast<char*>(info.data()), info.size(), my_free3, nullptr);
zmq::message_t msg(const_cast<char*>(m.data()), m.size(), my_free3, nullptr);
// std::cout << "info: " << std::endl << info << std::endl;
// std::cout << "--------------" << std::endl;
// std::cout << "m: " << std::endl << m << std::endl;
publisher->send(env, ZMQ_SNDMORE);
std::cout << " info size " << info.size() << std::endl;
publisher->send(header, ZMQ_SNDMORE);
std::cout << " m size " << m.size() << std::endl;
publisher->send(msg, ZMQ_NULL | ZMQ_NOBLOCK);
usleep(500);
// if(aaa)free(aaa);
// if(bbb)free(bbb);
}
return 0;
}
};
int main()
{
Apple apple;
apple.Start();
}
//based on https://www.codenong.com/45740168/
//based on https://blog.csdn.net/qq_41453285/article/details/106845900
//author samuel
#include "zmq.hpp"
#include <string>
#include <iostream>
#include "protobuf/examples.pb.h"
#include <thread>
#include "protobuf/msg_bridge_frame.pb.h"
using apollo::cyber::examples::proto::Chatter;
using apollo::msg_bridge::proto::Frame;
std::unique_ptr<zmq::context_t> context;
std::unique_ptr<zmq::socket_t> subscriber;
bool ThreadFunc();
int main()
{
std::thread thread = std::thread(ThreadFunc);
subscriber = nullptr;
thread.join();
}
bool ThreadFunc()
{
context = std::make_unique<zmq::context_t>(1);
subscriber = std::make_unique<zmq::socket_t>(*context, ZMQ_SUB);
subscriber->connect("tcp://127.0.0.1:8888");
const std::string topic="stream1";
subscriber->set(zmq::sockopt::subscribe,topic.data());
while (true) {
zmq::message_t env;
subscriber->recv(&env);
std::string env_str = std::string(static_cast<char*>(env.data()), env.size());
zmq::message_t header;
subscriber->recv(&header);
std::string header_str((char *)header.data(),header.size());
std::cout << "Received " << header_str << " on topic A" << std::endl;
zmq::message_t m;
subscriber->recv(&m);
std::string m_str((char*)m.data(), m.size());
std::shared_ptr<Chatter> m_proto_ptr = std::make_shared<Chatter>();
// std::cout << "Received content " << m_str << " on topic A" << std::endl;
}
return 0;
}
when the receiver get the message from publisher, the publisher program will show the error bad address or segment fault. I use gdb to look up the broken point and find it is ZMQbg/IO/0" received signal SIGABRT.
This looks more complicqtwd than it needs to be. E.g. the my_free function does nothing which will cause read from deallocated memory. Try constructing the message the simple way.
I want to use this to transport large message to reduce the delay in the communication. In my actual program, it will crash when the message bigger than 100KB 25HZ. or I skip send the first message and it can send remained messageses. But the message cannot be deserialized. It is not the message I want to send. In this example, it can send more big message with faster speed. But it will finally show bad address or segment fault.
I will fix the code in my_free function and try again. Thank you.
I found it have the error when I use zero-cpoy way to create big message and send it. If the transimission is large, the zero- copy way the create have problem., while the copy work well. I can send the 2MB size message and send 25Hz in copy message, but only can send 100KB message in the zero-cpoy way in the same frequency or I can send 2MB size meesage in low frequency.
This two work well. use alloc to take message or use copy way to create message.
This one cannot work. use zero-copy way to create message.