eclipse-iceoryx / iceoryx2

Eclipse iceoryx2™ - true zero-copy inter-process-communication in pure Rust
https://iceoryx.io
Apache License 2.0
1.03k stars 40 forks source link

ServiceType use 'Local', subscriber can not receive data, in C++ binding language examples #496

Closed xfsaid closed 1 week ago

xfsaid commented 3 weeks ago

Operating system: Ubuntu20.04.2 x86

Version: cmake : 3.31.0-rc3 cargo : 1.82.0 gcc : 9.4.0 g++ : 9.4.0

Files: examples/cxx/publish_subscribe/src/publisher.cpp examples/cxx/publish_subscribe/src/subscriber.cpp

elfenpiff commented 3 weeks ago

@xfsaid ServiceType::Local means that you can only communicate in one single process between threads. Therefore, when you use ServiceType::Local all publishers and subscribers must be in the same process - they can be in different threads though.

Does this explanation solve your issue?

xfsaid commented 3 weeks ago

OK, thank you very much. I am not familiar with the interface. When I used the "IPC" option, I found that the larger the structure to be transmitted, the slower the transmission speed. I thought that the "LOCAL" option was used for transmission through shared memory. I misunderstood. However, why does the time required for transmission increase as the structure increases?

elfenpiff commented 3 weeks ago

@xfsaid The transmission latency (speed) should stay constant - independent of the structure size. Could you share your code snippet here that shows how you send and receive your large structure?

Are you using the loan and send API or do you call send_copy which creates a copy of the transmitted data which has a significant performance impact?

xfsaid commented 1 week ago

@elfenpiff I know the reason. My publisher sends messages at a frequency of 1000Hz. I added an array of 1M to the test structure (path: examples/cxx/publish_subscribe/src/transmission_data.hpp). struct TransmissionData { std::int32_t x; std::int32_t y; double funky; char c[1024*1024]; } I found that the efficiency of the program is affected by the size of the array. By default, when the max_slice_len interface is not called, the CPU usage will increase from 3% to 35% when the array is larger than 1M.

Now I know that the publisher needs to call this interface: max_slice_len(uint64_t). auto publisher = service.publisher_builder() .max_slice_len(1024 * 1024 + 100) .create();

elfenpiff commented 1 week ago

@xfsaid

Please do not use max_slice_len in this case. It is an API-Bug on our side that this is available for non-iox::Slice types. The idea behind this call is that you can define the length of the dynamic array you want to handle - we call in this case slice.

Translated to your use case, it would mean if your construct the service with a payload of iox::Slice<TransmissionData> to have a dynamic array of TransmissionData elements.

auto service = node.service_builder(ServiceName::create("Service With Dynamic Data")
                   .publish_subscribe<iox::Slice<TransmissionData>>()
                   .open_or_create()
                   .expect("successful service creation/opening");

But this is not want you want in your use case!

When TransmissionData has a size of ~ 1MB and you allocate it with publisher.loan() then the whole 1MB is always initialized, meaning that all elements of TransmissionData::c are zeroed which is very CPU demanding. This is why we offer publisher.loan_uninit(). It returns you uninitialized memory and the CPU usage should be decreased.

The summary:

xfsaid commented 1 week ago

@elfenpiff Thank you very much for your explanation. To add, I did the test on ARM hardware. But I always used the loan_uninit interface. After my test, I found that the high CPU usage was caused by calling the interface "sample.write_payload". After commenting this line of code, although the data received by the subscriber is 0, the CPU usage will be very low.

I'll post my test code.

examples/cxx/publish_subscribe/src/transmission_data.hpp

#ifndef IOX2_EXAMPLES_TRANSMISSION_DATA_HPP
#define IOX2_EXAMPLES_TRANSMISSION_DATA_HPP

#include <cstdint>
#include <iostream>

struct TransmissionData {
    std::int32_t x;
    std::int32_t y;
    double funky;
    char c[1024*1024];
};

inline auto operator<<(std::ostream& stream, const TransmissionData& value) -> std::ostream& {
    stream << "TransmissionData { x: " << value.x << ", y: " << value.y << ", funky: " << value.funky << " }";
    return stream;
}

#endif

examples/cxx/publish_subscribe/src/subscriber.cpp

#include <iostream>

#include "iox/duration.hpp"
#include "iox2/node.hpp"
#include "iox2/service_name.hpp"
#include "iox2/service_type.hpp"
#include "transmission_data.hpp"

constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromMilliseconds(1); // fromMilliseconds fromSeconds

auto main() -> int {
    using namespace iox2;
    auto node = NodeBuilder().create<ServiceType::Ipc>().expect("successful node creation");

    auto service = node.service_builder(ServiceName::create("My/Funk/ServiceName").expect("valid service name"))
                       .publish_subscribe<TransmissionData>()
                    //    .history_size(100)
                       .subscriber_max_buffer_size(200) // 决定了缓存的大小
                       .subscriber_max_borrowed_samples(200) // 订阅者可以借用的最大样本数量
                       .open_or_create()
                       .expect("successful service creation/opening");

    auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation");
    std::cout << "history_size:" << service.static_config().history_size() << std::endl;
    std::cout << "subscriber_max_borrowed_samples:" << service.static_config().subscriber_max_borrowed_samples() << std::endl;

    int counter = 0;
    while (node.wait(CYCLE_TIME).has_value()) {
        auto sample = subscriber.receive().expect("receive succeeds");
        while (sample.has_value()) {
            counter += 1;
            if (counter % 1000 == 0)
            {
                std::cout << "received: " << sample->payload() << ", counter: " << counter << std::endl;
            }
            sample = subscriber.receive().expect("receive succeeds");
        }
    }

    std::cout << "exit" << std::endl;

    return 0;
}

examples/cxx/publish_subscribe/src/publisher.cpp

#include "iox/duration.hpp"
#include "iox2/node.hpp"
#include "iox2/sample_mut.hpp"
#include "iox2/service_name.hpp"
#include "iox2/service_type.hpp"
#include "transmission_data.hpp"

#include <iostream>
#include <utility>

constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromMilliseconds(1); // fromMilliseconds fromSeconds

auto main() -> int {
    using namespace iox2;
    auto node = NodeBuilder().create<ServiceType::Ipc>().expect("successful node creation");

    auto service = node.service_builder(ServiceName::create("My/Funk/ServiceName").expect("valid service name"))
                       .publish_subscribe<TransmissionData>()
                       .subscriber_max_buffer_size(200) // 决定了缓存的大小
                       .subscriber_max_borrowed_samples(200) // 订阅者可以借用的最大样本数量
                       .open_or_create()
                       .expect("successful service creation/opening");

    auto publisher = service.publisher_builder()
                        // .max_slice_len(1024 * 1024 + 100)
                        .create().expect("successful publisher creation");

    auto counter = 0;
    while (node.wait(CYCLE_TIME).has_value() && counter < (1000 * 60 * 60 + 5)) {
        counter += 1;

        auto sample = publisher.loan_uninit().expect("acquire sample");

        sample.write_payload(TransmissionData { counter, counter * 3, counter * 812.12 }); // NOLINT
        auto initialized_sample = assume_init(std::move(sample));

        send(std::move(initialized_sample)).expect("send successful");

        if (counter % 1000 == 0)
        {
            std::cout << "Send sample " << counter << " ..." << std::endl;
        }

    }

    std::cout << "exit" << std::endl;

    return 0;
}
elfenpiff commented 1 week ago

@xfsaid the high CPU usage is expected in the line sample.write_payload(...). Memory read/write operations are always expensive this is why we try do reduce them as much as possible and use zero-copy communication in the first place.

But you can optimize it further by reducing the memory write operations. sample.write_payload(...) creates a new TransmissionData instance and copies this in the shared memory which is maybe unnecessary when not all member need to be initialized. Let's assume you only need to initialize x, y, and the first 100 elements of c. You can do this explicitly and leave the rest uninitialized which saves you a lot of memory write operations.

auto sample = publisher.loan_uninit().expect("acquire sample");
sample->x = 123;
sample->y = 856;
for(auto n = 0; n < 100; ++n) {
  sample->c[n] = 5;
}
xfsaid commented 1 week ago

@elfenpiff Thank you very much for your explanation, I understand.

Regarding the problem that many header files are missing after executing the "make install" command, I will create a new issue.