eclipse-ecal / ecal

📦 eCAL - enhanced Communication Abstraction Layer. A high performance publish-subscribe, client-server cross-plattform middleware.
https://ecal.io
Apache License 2.0
806 stars 167 forks source link

Publish and Subscribe on Seperate Threads #30

Closed JeremyBYU closed 4 years ago

JeremyBYU commented 4 years ago

Describe the bug I was wondering if ECAL can handle having a publisher on one thread and a subscriber on another thread. I am using std:thread to spawn a separate publishing thread while the main thread just subscribes to those messages.

The below example does not work. The publishing thread is spawned and messages are continously sent. However the OnReceive callback is never fired which is connected to the main thread.

I know that messages are being published correctly because I can create another indepent program that only subscribes to these messages and they are received. So the issue seems to be on the main thread (receiving).

To Reproduce

void publish()
{
    eCAL::CPublisher pub("Counter", "long long");
    // idle main thread
    while(eCAL::Ok())
    {
        // sleep 100 ms
        std::cout << "Sending data on seperate thread" << std::endl;
        long long start_time_loop = eCAL::Time::GetMicroSeconds();
        pub.Send(&start_time_loop, sizeof(long long));
        eCAL::Process::SleepMS(1000);
    }

}
void OnReceive(const char* /* topic_name_ */, const struct eCAL::SReceiveCallbackData* data_)
{
    std::string rec_buf;
    std::cout << "Received data on main thread: " << std::endl;
}

int main(int argc, char* argv[])
{
      // initialize eCAL API
    eCAL::Initialize(0, nullptr, "RSTrackerSub");
        eCAL::CSubscriber sub("Counter", "long long");

    // setup receive callback function
    sub.AddReceiveCallback(OnReceive);

    auto t1 = std::thread(publish);
    // idle main thread
    while(eCAL::Ok())
    {
        // sleep 100 ms
        std::cout << "Main thread loop waiting for messages" << std::endl;
        eCAL::Process::SleepMS(1000);
    }

}

Output:

root@33c34a5aa5d0:/opt/workspace/build# ../bin/rs-test 
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
Main thread loop waiting for messages
Sending data on seperate thread
rex-schilasky commented 4 years ago

Hi JeremyBYU, this is a builtin optimization feature to NOT receive publications from the same process (in many cases there are more efficient options to share memory between two threads :-)).

But you can use the eCAL Util function

  eCAL::Util::EnableLoopback(true);

to switch on the loopback of your internal publications. So that one should work

int main(int argc, char* argv[])
{
  // initialize eCAL API
  eCAL::Initialize(0, nullptr, "RSTrackerSub");

  // enable to receive process internal publications
  eCAL::Util::EnableLoopback(true);

  eCAL::CSubscriber sub("Counter", "long long");

  // setup receive callback function
  sub.AddReceiveCallback(OnReceive);

  auto t1 = std::thread(publish);
  // idle main thread
  while (eCAL::Ok())
  {
    // sleep 100 ms
    std::cout << "Main thread loop waiting for messages" << std::endl;
    eCAL::Process::SleepMS(1000);
  }
}
JeremyBYU commented 4 years ago

Awesome that seemed to work. I am aware that there are more efficient methods to to do intraprocess communication (between threads from same launching process). However the data that is shared between the threads will also need to be shared outside of the process (interprocess). I think the most efficient method would be to do a shared global variable with mutex between threads and then use ECAL for communication between separate processes. However that seemed needlessly complex, having to use two different communication strategies. So I decided to just stick with using ECAL, and deal with the double copy of the shared memory buffer that ECAL should be using.

However I want to bring to your attention that ECAL doesnt have to do to the double copy from shared memory for intraprocess comms. ROS2 has an intraprocess communication strategy that does no copy for between thread comms. See here. The publishing thread, call it ThreadA, simply gives up ownership of the object and the subscribing thread, call it ThreadB, then gets the reference. What is really fantastic about this technique is that if another separate process, call it Process2, subscribes to this data then Process2 will receive a copy of the object (using shared memory like ECAL) but ThreadB will continue to receive the original object with no copy.

At least that is my understanding. To summarize, ROS2 seems to have a way to have the best of both worlds: fast no copy intraprocess comms and still fast double copy shared memory interprocess comms. All using (roughly) the same API.

rex-schilasky commented 4 years ago

Glad to hear that it is working now. If you really like to have zero copy intraprocess communication you can switch this on for a single publisher that way

  pub.SetLayerMode(eCAL::TLayer::tlayer_inproc, eCAL::TLayer::smode_auto);

see sample _cpp/samples/person_sndinproc for more details. So eCAL is sharing the memory without any copy with all subscriber callback functions. To avoid data races the memory is blocked for writing while processing all registered callbacks. This can lead to a slower performance compared to use a shared memory and doing a copy of it before processing it finally fully multithreaded.

Additional you can switch on the new iceoryx transport layer for zero copy intraprocess communication with the same pro and cons. This combination

  pub.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off);
  pub.SetLayerMode(eCAL::TLayer::tlayer_inproc, eCAL::TLayer::smode_on);
  pub.SetLayerMode(eCAL::TLayer::tlayer_iceoryx eCAL::TLayer::smode_on);

would realize zero copy for inter- and intraprocess communication. You can configure this globally in the ecal.ini file for all running processes as default behavior or pass these settings by command line parameters.

Thank you again for your feedback !

JeremyBYU commented 4 years ago

Wow I had no idea these options were there. I will have to keep digging through the documentation available. These options seems to be exactly what I'm looking for.