eclipse-iceoryx / iceoryx

Eclipse iceoryx™ - true zero-copy inter-process-communication
https://iceoryx.io
Apache License 2.0
1.65k stars 384 forks source link

How to handle messages in parallel with popo::listener? #2302

Closed alan0526 closed 3 months ago

alan0526 commented 3 months ago

Is there any example of execute tasks use thread pool in Listener callback of untyped server? I tried to use BS::thread_pool to execute task in parallel, but the server is always crash after handle some requests. This is my code, I use listener in a class:

listener.attachEvent(
            *server, iox::popo::ServerEvent::REQUEST_RECEIVED,
            iox::popo::createNotificationCallback(on_event, *this))
        .or_else([](auto) {
            std::cerr << "unable to attach server" << std::endl;
            std::exit(EXIT_FAILURE);
        });

std::string process_method(std::string_view method_name, std::string_view arguments) const {
        if(!_methods.contains(method_name.data())) {
            return std::format("method: {} is not exist", method_name);
        }

        return _methods.at(method_name.data()).handler(arguments);
    }

static void on_event(iox::popo::UntypedServer* server, ServerBase* self) {
    server->take().and_then([server, self](const auto& request_payload) {
        auto e_id = server->getServiceDescription().getEventIDString().c_str();
        auto payload = static_cast<const char*>(request_payload);
        auto request_header = iox::popo::RequestHeader::fromPayload(request_payload);
        auto chunk_header = request_header->getChunkHeader();
        auto user_payload_size = chunk_header->userPayloadSize();
        std::string ret_method;
        try {
            ret_method = self->process_method(e_id, std::string_view{ payload, user_payload_size });
        }
        catch (...) {
        }
        server->loan(request_header, ret_method.size(), 1)
              .and_then([&](auto& response_payload) {
                  auto response = static_cast<uint8_t*>(response_payload);
                  if (!ret_method.empty())
                      std::memcpy(response, ret_method.c_str(), ret_method.size());
                  server->send(response).or_else(
                      [&](auto& error) {
                          std::cout << "Could not send Response! Error: " << error << std::endl;
                      });
              })
              .or_else(
                  [&](auto& error) {
                      std::cout << "Could not allocate Response! Error: " << error << std::endl;
              });
        server->releaseRequest(payload);
        }).or_else(
        [&](auto& error) {
            std::cout << "Could not take event! Error: " << error << std::endl;
        });
}
alan0526 commented 3 months ago

Sorry, It seems that I use the wrong version.

elfenpiff commented 3 months ago

@alan0526 No worries. We are happy that the issue was solved on its own :smile:

elBoberido commented 3 months ago

@alan0526 also, be aware that the sample lives long enough if you use the pointer to the payload from another thread. Ideally, the whole sample would be just moved to the thread from the thread pool. This is usually easier to do with the WaitSet than with the Listener since the the server needs to be guarded with a mutex when accessing it from multiple threads, e.g. when sending a response. This cannot be done with the Listener since it also accesses the Server internally from an implicitly started thread.