zeromq / cppzmq

Header-only C++ binding for libzmq
http://www.zeromq.org
MIT License
1.93k stars 757 forks source link

Can't communicate using IPC between Parent Process & Child Process. #560

Open devararendy opened 2 years ago

devararendy commented 2 years ago

Hi i would like to send data from child process to parent process (at main thread). But at the parent process it's not receiving the message. I have tried to set file permission to 0777 and ownership to root.

OS : ubuntu 20.04 via : Docker Libzmq : v4.3.4 CPPZMQ : v4.8.1 User : root

Step to repro :

  1. Create main thread that receiving ZeroMQ PAIR IPC
  2. Create new thread (worker thread) that do fork() then create IPC socket that send inside child process.
#include <thread>
#include <zmq_addon.hpp>
#include <iostream>
#include <vector>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>

using namespace std;

void thdWorker()
{
    cout << "Thread Worker Started..." << endl;
    if (fork() == 0)
    {
        cout << "This is Child Process" << endl;

        zmq::context_t lCtx;
        zmq::socket_t socketChildProcessTx(lCtx, ZMQ_PAIR);
        socketChildProcessTx.connect("ipc:///tmp/workers.ipc");

        socketChildProcessTx.send(zmq::str_buffer("data0"), zmq::send_flags::sndmore);
        socketChildProcessTx.send(zmq::str_buffer("data1"), zmq::send_flags::sndmore);
        socketChildProcessTx.send(zmq::str_buffer("data2"), zmq::send_flags::none);
        socketChildProcessTx.close();

        cout << "Exiting Child Process" << endl;
        _exit(0);
    }
    else
    {
        cout << "This is Parent Process" << endl;

        cout << "Exiting Parent Process" << endl;
    }
    cout << "Exiting worker" << endl;
}
int main()
{
    cout << "Running Main Thread..." << endl;
    zmq::context_t parentCtx;
    zmq::socket_t socketChildProcessRx(parentCtx, ZMQ_PAIR);
    socketChildProcessRx.bind("ipc:///tmp/workers.ipc");

    // Run Worker Thread
    std::thread threadKafkaConsumer(thdWorker);
    threadKafkaConsumer.detach();

    while(1)
    {
        cout << "Main Loop ..." << endl;
        vector<zmq::message_t> msgs;
        zmq::recv_result_t result = zmq::recv_multipart(socketChildProcessRx, back_inserter(msgs));
        // Print raw data in dec for easier debugging
        cout << "Child Process: Total Msgs : " << result.value() << endl;
        for (int x=0;x<msgs.size();x++)
        {
                cout << "Message " << x << " is : " << msgs[x].to_string()  << endl;
        }
    }
}

The output is :

Running Main Thread...
Main Loop ...
Thread Worker Started...
This is Parent Process
Exiting Parent Process
Exiting worker
This is Child Process
Exiting Child Process

Is there any missing here or IPC cant be use for communication between parent & child process ?

Thank You very much

Best Regards

Rendy

devararendy commented 2 years ago

Update :

After adding 1s delay (std::this_thread::sleep_for(1000ms);) after connect(), the main thread can receive the message. but sometimes can't receive. Is there any proper way to check whether the connection is successfully estabilished ?


#include <thread>
#include <zmq_addon.hpp>
#include <iostream>
#include <vector>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <chrono>

using namespace std;

void thdWorker()
{
    cout << "Thread Worker Started..." << endl;
    if (fork() == 0)
    {
        cout << "This is Child Process" << endl;

        zmq::context_t lCtx;
        zmq::socket_t socketChildProcessTx(lCtx, ZMQ_PAIR);
        socketChildProcessTx.connect("ipc:///tmp/workers.ipc");
        std::this_thread::sleep_for(1000ms);
        socketChildProcessTx.send(zmq::str_buffer("data0"), zmq::send_flags::sndmore);
        socketChildProcessTx.send(zmq::str_buffer("data1"), zmq::send_flags::sndmore);
        socketChildProcessTx.send(zmq::str_buffer("data2"), zmq::send_flags::none);
        socketChildProcessTx.close();

        cout << "Exiting Child Process" << endl;
        _exit(0);
    }
    else
    {
        cout << "This is Parent Process" << endl;

        cout << "Exiting Parent Process" << endl;
    }
    cout << "Exiting worker" << endl;
}
int main()
{
    cout << "Running Main Thread..." << endl;
    zmq::context_t parentCtx;
    zmq::socket_t socketChildProcessRx(parentCtx, ZMQ_PAIR);
    socketChildProcessRx.bind("ipc:///tmp/workers.ipc");

    // Run Worker Thread
    std::thread threadKafkaConsumer(thdWorker);
    threadKafkaConsumer.detach();

    while(1)
    {
        cout << "Main Loop ..." << endl;
        vector<zmq::message_t> msgs;
        zmq::recv_result_t result = zmq::recv_multipart(socketChildProcessRx, back_inserter(msgs));
        // Print raw data in dec for easier debugging
        cout << "Child Process: Total Msgs : " << result.value() << endl;
        for (int x=0;x<msgs.size();x++)
        {
                cout << "Message " << x << " is : " << msgs[x].to_string()  << endl;
        }
    }
} 

Output :


Running Main Thread...
Main Loop ...
Thread Worker Started...
This is Parent Process
Exiting Parent Process
Exiting worker
This is Child Process
Exiting Child Process
Child Process: Total Msgs : 3
Message 0 is : data0
Message 1 is : data1
Message 2 is : data2
Main Loop ...
gummif commented 2 years ago

I don´t think there is a way in zmq to check if anyone is connected. You probably need to implement something yourself, like heartbeats, ping or something else.

devararendy commented 2 years ago

I don´t think there is a way in zmq to check if anyone is connected. You probably need to implement something yourself, like heartbeats, ping or something else.

Thank you for you reply, i see. But is that normal, the connection took around 0.5 - 1 sec to be connected ?