Squadrick / shadesmar

Fast C++ IPC using shared memory
MIT License
555 stars 85 forks source link

use protobuf for big message #68

Open liuhao123t opened 6 months ago

liuhao123t commented 6 months ago

i try use protobuf to convey big message, like pointcloud when the convey message is bigger, the publish program can run ,but the subscriber program will gets stuck there after running for a period of time。

subscriber code

#include <shadesmar/pubsub/subscriber.h>
#include <proto/cloud.pb.h>
#include <string.h>
#include <chrono>
#include <thread>

void callback(shm::memory::Memblock *msg) {
  // `msg->ptr` to access `data`
  // `msg->size` to access `data_size`
  std::cout << "start call back" << std::endl;
  shadesmar::TanwyProtoCloud tw_proto_point_cloud;

  auto start = std::chrono::high_resolution_clock::now();
  if (!tw_proto_point_cloud.ParseFromArray((char*)msg->ptr, msg->size)) {
    std::cerr << "Failed to deserialize message." << std::endl;
    return;
  }
  // 获取结束时间点
  auto end = std::chrono::high_resolution_clock::now();

  // 计算时间差
  auto duration =std::chrono::duration_cast<std::chrono::microseconds>(end - start);
  // 输出计时结果
  std::cout << "deserialize Time taken: " << duration.count() << " microseconds" << std::endl;

  std::cout << "msg->size " << msg->size << std::endl;
  // std::cout << "recieve msg size " << serialized_data.size() << std::endl;
  std::cout << "name: " << tw_proto_point_cloud.name() << std::endl;
  std::cout << "frame_id: " << tw_proto_point_cloud.frame_id() << std::endl;

  float* points = (float*)tw_proto_point_cloud.points().c_str();
  unsigned int* points_time = (unsigned int*)tw_proto_point_cloud.points_time().c_str();

  int num_points = tw_proto_point_cloud.num_points();
  int point_num_fields = tw_proto_point_cloud.point_num_fields();
  int time_num_fields = tw_proto_point_cloud.time_num_fields();

  std::cout << "num_points: " << num_points << std::endl;
  std::cout << "point_num_fields: " << point_num_fields << std::endl;
  std::cout << "time_num_fields: " << time_num_fields << std::endl;

}

int main() {
  shm::pubsub::Subscriber sub("test", callback);

  sub.spin();
}

publish code

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

#include <shadesmar/pubsub/publisher.h>
#include <proto/cloud.pb.h>
#include <chrono>

int main(){

  std::string topic = "test";
  shm::pubsub::Publisher pub(topic);

  shadesmar::TanwyProtoCloud tw_proto_point_cloud;
  std::string name = "tanway_cloud";
  int frame_id = 1;
  double timestamp = 3.1333333;
  int point_num_fields = 4;
  int num_points = 100000;
  int time_num_fields = 2;
  float* points = new float[num_points * point_num_fields];
  int* points_time = new int[num_points * time_num_fields];
  for (int i=0;i<num_points;i++){
    for (int j = 0; j < point_num_fields;j++){
      points[i * point_num_fields + j] = float(i) + ((float)j)/10;
    }
    for (int k = 0; k < 2; k++) {
      points_time[i * 2 + k] = i * 2 + k;
    }
  }

  tw_proto_point_cloud.set_name(name);
  tw_proto_point_cloud.set_frame_id(frame_id);
  tw_proto_point_cloud.set_timestamp(timestamp);
  tw_proto_point_cloud.set_point_num_fields(point_num_fields);
  tw_proto_point_cloud.set_num_points(num_points);
  tw_proto_point_cloud.set_time_num_fields(time_num_fields);
  tw_proto_point_cloud.set_points((void*)points, num_points * point_num_fields * sizeof(float));
  tw_proto_point_cloud.set_points_time((void*)points_time, num_points * time_num_fields * sizeof(unsigned int));

  for (int i=0;i<500;i++) {
    frame_id = i;
    tw_proto_point_cloud.set_frame_id(frame_id);
    int serialized_size = tw_proto_point_cloud.ByteSizeLong();
    char* buffer = new char[serialized_size];

    if (!tw_proto_point_cloud.SerializeToArray(buffer, serialized_size)) {
      std::cerr << "Failed to serialize message." << std::endl;
      // delete[] buffer;
      // return -1;
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(50));
    pub.publish(reinterpret_cast<void*>(buffer), serialized_size);
    std::cerr << "publish : "<< i << std::endl;
    delete[] buffer;
  }
}
proto msg define
syntax = "proto2";

package shadesmar;

message TanwyProtoCloud{
    required string name = 1;
    required int32 frame_id = 2;
    required double timestamp = 3;
    required int32 point_num_fields = 4;
    required int32 num_points = 5;
    required bytes points = 6;
    required int32 time_num_fields = 7;
    required bytes points_time = 8;
}
feelwa17 commented 4 months ago

Did you solve this problem?