mutouyun / cpp-ipc

C++ IPC Library: A high-performance inter-process communication using shared memory on Linux/Windows.
Other
1.76k stars 337 forks source link

能否提供更加复杂一点的Sample? #35

Open caibf opened 3 years ago

caibf commented 3 years ago

比如想实现类似HTTP请求的功能,需要用到ipc::channel支持多写多读。怎么实现进程1同一时刻多发,进程2同一时刻多收,分发处理,分发响应,不需要Lock?

mutouyun commented 3 years ago

channel 自身不是线程安全的,每个线程持有自己的 channel,然后 connect 到同一个 name 上就可以了。

同一个 channel 上的所有接收者是抢占式的,你需要自己做分发。 如果你需要 dispatch 的话,使用不同的 name 就可以了(类似 pub-sub),而 channel 的 name,就相当于 topic。

caibf commented 3 years ago

谢谢您的建议~ 我现在使用不同的name创建不同组channel,实现了消息的”并发“dispatcher,消息包采用Jsoncpp来打包解析。这个库使我受益很大,后续要多多研究内部机制与实现。

还有一个问题想请教下,虽然是基于共享内存达到不同进程间的数据通信,但API接口接收都是const char* 或std::string。如果我想传递复杂类型,比如std::vector之类的三维数据点集,要从Jsoncpp封送为std::string,再调用API分发,这样效率大打折扣,有没其它更优的方式呢?

mutouyun commented 3 years ago

你需要一个序列化库,比如 protobuf、capnp、flatbuf,你可以参考下这些库。 这个库里的序列化实现也可以参考下:https://github.com/qicosmos/rest_rpc

caibf commented 3 years ago

如果要将数据序列化,这性能就低了啊,我现在就是使用Jsoncpp序列化库。

mutouyun commented 3 years ago

共享内存在不同进程里的地址不一定相同,因此你无法直接将指针传递出去,你需要一个类似 boost::flat_buffer 的东西将内存内容平坦化。 除非你的数据本身就是可平凡拷贝的(is_trivially_copyable),你可以免去序列化的步骤。

目前的接口在发送时怎样都得拷贝一次数据。如果不想拷贝,就需要直接使用共享内存(而不是new/malloc)来构造自己的对象,之后通过 ipc 将它的“句柄”发出去。 由于共享内存的打开和关闭也是耗时操作,所以你需要自己建立一个共享内存池来做缓存。

Jsoncpp的性能是很低的,因为它本就不是为性能考虑的。需要性能的话,首先就不能考虑json,而应该使用idl。

caibf commented 3 years ago

我在cpp-ipc的基础之上,利用Boost共享内存机制解决了数据的“拷贝”问题。

#define BOOST_USE_WINDOWS_H

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>

typedef boost::interprocess::managed_shared_memory::segment_manager segment_manager_t;
typedef boost::interprocess::allocator<void, segment_manager_t> void_allocator;
typedef boost::interprocess::allocator<Eigen::Vector3d, segment_manager_t> shmem_allocator;
typedef boost::interprocess::vector<Eigen::Vector3d, shmem_allocator> vector_vector3d;

static boost::interprocess::managed_shared_memory segment(boost::interprocess::open_or_create, name_shared_memory__, addr_shared_memory__);
static void_allocator alloc_inst(segment.get_segment_manager());
vector_points = segment.construct<vector_vector3d>(name_vector_points__)(alloc_inst);

后来,程序运行过程中。偶尔会出现崩溃现象,在系统的事件管理器中看到了如下日志,像这种问题如何去捕捉ipc引起的异常呢? image

mutouyun commented 3 years ago

用 signal 捕捉一下,然后在触发的时候把调用栈打出来看看

caibf commented 3 years ago

用 signal 捕捉一下,然后在触发的时候把调用栈打出来看看

可以同您微信沟通吗?我在C++方面经验不足,这样说还不太明白怎么去做?

另外,我用了这里的Tutorial,稍微改了下,代码如下:

#include <thread>

#include "libipc/ipc.h"

int main()
{
    using namespace std::literals;

    std::size_t default_timeout = 3000; // ms

    std::vector<char const*> const datas = {
        "hello!",
        //"foo",
        //"bar",
        //"ISO/IEC",
        //"14882:2011",
        //"ISO/IEC 14882:2017 Information technology - Programming languages - C++",
        //"ISO/IEC 14882:2020",
        //"Modern C++ Design: Generic Programming and Design Patterns Applied"
    };

    // thread producer
    std::thread t1{ [&] {
        ipc::channel cc { "my-ipc-channel", ipc::sender | ipc::receiver };
        for (std::size_t i = 0; i < datas.size(); ++i) {
            std::this_thread::sleep_for(1s);
            // try sending data
            while (!cc.send(datas[i])) {
                // waiting for connection
                cc.wait_for_recv(2); // 这里等待2个连接的原因是,thread producer的cc自身也是一个连接(ipc::receiver)
            }
            std::printf("producer send: %s\n", datas[i]);

            // recv ack
            auto dd = cc.recv();
            auto str = static_cast<char*>(dd.data());
            if (str == nullptr) {
                std::printf("producer recv ack: error!\n");
            }
            else {
                std::printf("producer recv ack: %c\n", str[0]);
            }
        }
        // quit
        cc.send(ipc::buff_t('\0'));
    } };

    // thread consumer
    std::thread t2{ [&] {
        ipc::channel cc { "my-ipc-channel", ipc::sender | ipc::receiver };
        while (1) {
            auto start_time = std::chrono::system_clock::now();
            //auto dd = cc.recv();
            auto dd = cc.recv(default_timeout);
            auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start_time).count();
            if (elapsed_time >= default_timeout) {
                std::printf("consumer send timeout, elapsed_time=%d\n", elapsed_time);
                continue;
            }
            std::string str{ dd.get<char const*>(), dd.size() - 1 };
            if (str.empty())
            {
                std::printf("Recv message is empty!\n");
                continue;
            }

            std::printf("consumer recv: %s\n", str);
            // try sending ack
            //while (!cc.send(ipc::buff_t('a'))) {
            //    // waiting for connection
            //    cc.wait_for_recv(2);
            //}
            bool ret = cc.try_send(ipc::buff_t('a'), default_timeout);
            if (!ret) std::printf("consumer send timeout\n");
        }
    } };

    t1.join();
    t2.join();
}

控制台输出是这样的,好像不太对

producer send: hello!
consumer recv: hello!
producer recv ack: a
Recv message is empty!
consumer send timeout, elapsed_time=3016
consumer send timeout, elapsed_time=3007

为什么会输出“Recv message is empty!”这条信息呢?

mutouyun commented 3 years ago
    std::vector<char const*> const datas = {
        "hello!",
        //"foo",
        //"bar",
        //"ISO/IEC",
        //"14882:2011",
        //"ISO/IEC 14882:2017 Information technology - Programming languages - C++",
        //"ISO/IEC 14882:2020",
        //"Modern C++ Design: Generic Programming and Design Patterns Applied"
    };

上面这段代码里,你的准备数据只有一个,所以 datas.size() 返回值为1。 因此 producer 线程在发完一个数据之后,即退出了 for 循环,走到了

        // quit
        cc.send(ipc::buff_t('\0'));

这里发送的一字节数据,被 consumer 接收到之后,通过

            std::string str{ dd.get<char const*>(), dd.size() - 1 };

得到了一个空字符串(dd.size() - 1等于0),因此打印出了“Recv message is empty!”。

我最近比较忙,你可以邮件(orz@orzz.org)告诉我你的微信号,我来加你。

caibf commented 3 years ago

如您所述,将这里注释掉就不会打印“Recv message is empty!”这条消息了。我没注意到哈~ // cc.send(ipc::buff_t('\0'));

shenfumin commented 3 months ago

_你需要一个序列化库,比如 protobuf、capnp、flatbuf,你可以参考下这些库。 这个库里的序列化实现也可以参考下:https://github.com/qicosmos/rest_rpc_

按照博主的思路,基于cpp-ipc,我是实现了一个ipc-bus。 当前主要是pub-sub。 https://github.com/shenfumin/ipc-bus