duchengyao / duchengyao.github.io

1 stars 0 forks source link

Fast DDS 的介绍,以及一些调试技巧 #15

Closed duchengyao closed 1 year ago

duchengyao commented 2 years ago

之前分享过一次关于 FastDDS 的调研,简单来说就是其作为消息中间件,具有很强的实时性、可靠性、灵活性等优点,可以简化开发者对通信层面的关注,可以集中精力专注于业务的研究。

0x01 回顾 FastDDS

FastDDS 有两套 API,接近传输层的 RTPS API 和规范的 DDS API

1.1 传输层

包含了兼容各平台的 TCP / UDP / SHM 协议的实现

1.2 RTPS 层

使用 RTPS 层可以直接操作传输层提供的接口,类似直接对内存操作,更加灵活。

int index = 3;
string message = "hello world";

ch->serializedPayload.length = 200;
memcpy(ch->serializedPayload.data, &index, sizeof(index));
memcpy(ch->serializedPayload.data + sizeof(index), message.c_str(), 20);
mp_history->add_change(ch);

std::cout << "index = " << *(int*) ch->serializedPayload.data
          << "; message = " << ch->serializedPayload.data + sizeof(index)
          << std::endl;
-----
>> index = 3; message = hello

1.3 DDS 层

DDS 层与 RTPS 层最大的区别是使用 主题 (Topic) 的概念,在主题的基础上包含发布、订阅、消息过滤等功能。

主题使用接口描述语言 (Interface Description Language, IDL),将传输的数据结构建模为类型化接口:

// HelloWorld.idl

struct HelloWorld
{
    unsigned long index;
    string message;
};

对类型直接操作:

HelloWorld hello;
hello.index(3);
hello.message("hello world");

writer->write(&hello);

1.4 Discovery

FastDDS 的"发现"机制,用于自动发现新上线的客户端。下图分别为分布式(Simple Discovery)、集中式(Discovery Service)发现机制。

image

a. Simple Discovery

RTPS 协议标准的发现机制,通过多播互相发现。

b. Discovery Server

使用 Discovery Server 可以减少流量,且不需要多播功能。

c. Others

无法使用多播,如 Wi-Fi;或已知拓扑结构,想减少流量。

1.5 Listening Locators

Locator_t 代表传输信道,包含 IP 端口 等信息。

Locators 分为两类,Metatraffic locators User locators 。其中,前者主要用于 Discovery,后者用于传输用户的数据。当用户不手动配置 Locators 时,其端口按特定的规则分配:

Traffic type Well-known port expression pid = 1 pId = 2 pId = 3
Metatraffic multicast 7400 + 250 * domainId 7400 7400 7400
Metatraffic unicast 7400 + 250  domainId + 10 + 2  participantId 7412 7414 7416
User multicast 7400 + 250 * domainId + 1 7401 7401 7401
User unicast 7400 + 250  domainId + 11 + 2  participantId 7413 7415 7417

0x02 使用 pthread_mutexattr_setrobust 解决 boost::interprocess 死锁

接下来的内容为,在使用 FastDDS 过程中碰到的两个小问题和解决方案。

2.1 问题复现

boost::interprocess 有一个死锁的 bug,从 18 年就有人提出,21年才修复。最简单的复现方法是执行三次下面的代码:

#include "boost/interprocess/managed_shared_memory.hpp"

int main() {
  // 打开或创建一块名叫“Boost”的共享内存
  boost::interprocess::managed_shared_memory managed_shm{
      boost::interprocess::open_or_create, "Boost", 1024
  };
  // 给共享内存创建写入一个值
  int* i = managed_shm.construct<int>("Int")(99);
}

2.2 修复方法

代码 主要有两处修改。

  1. 在创建锁的时候,设置 pthread_mutexattr_setrobust(&m_attr, PTHREAD_MUTEX_ROBUST)
  2. 在加锁的时候判断锁的状态,如果为EOWNERDEAD (owner dead)ENOTRECOVERABLE (not recoverable),标记为这个锁已经损坏,抛出异常。

0x03 使用 git squash 保持干净的主分支,及相应的调试方法

先炫耀一下我已经给 4 个大型项目 贡献过代码,包括深度学习框架3D 打印机固件FastDDS image

3.1 通过 git squash 合并分支

有的项目要求一个 commit 必须包含完整的功能,但在开发时通常会提交很多细粒度的 commit,比如 add test clean code 等。这种情况下可以使用 git squash 将多个 commit 压缩,得到干净的主分支.

git merge --squash tmp
git commit -m "squash tmp"

# In the following graph, G is c--d--e--f--g squashed together

          X-------------G stable
         /
a---b---c---d---e---f---g tmp

3.2 调试被 squash 后的代码

squash 的优点是可以得到简洁的 commit 记录,但缺失详细的 commit 记录会对后面 debug 造成一些影响。

比如,使用FastDDS后,我一直在调试一个无法重连的bug,通过 log 可以定位到这个 bug 是在一次 2000 行的 commit 引入的。如果现在开始读这两千行代码肯定会浪费很多时间。

现在要借助 github,在 pull request 页面可以找到详细的 23 个 commit 记录,并定位到引入 bug 的 commit:

image

为了防止这个 commit 不完整,我们需要手动下载引入 bug 的代码,和引入 bug 之前的代码,分别做一次回归测试。(手动下载代码的原因是,这些 commit ID 不在 git repo 里,无法 checkout)

在 master 里改完 bug 以后,为了防止引入新 bug,要再跑一次回归测试。