servo / ipc-channel

A multiprocess drop-in replacement for Rust channels
Apache License 2.0
846 stars 126 forks source link

Why can I only connect to IpcSender and not IpcReceiver? #306

Closed patientplatypus6 closed 1 year ago

patientplatypus6 commented 1 year ago

Ok so this is a stupid question, but it seems like it should be obvious.

Why can't I connect to IpcReceiver? So I make a IpcOneShotServer and then I can

let tx0 = IpcSender::connect(name).unwrap(); and tx0.send(tx1).unwrap();

So far so good.

But if I have a main process that creates the server and then spawns spawned process I want spawned process to be able to do this

let tx0 = IpcReceiver::connect(name).unwrap(); and tx0.receiveall().unwrap();

There's let response = rx.recv().unwrap();

and

loop {
    match rx.try_recv() {
        Ok(res) => {
            // Do something interesting wth your result
            println!("Received data...");
            break;
        },
        Err(_) => {
            // Do something else useful while we wait
            println!("Still waiting...");
        }
    }
}

BUT

let (tx, rx) = ipc::channel().unwrap();

doesn't take a server name but creates a new one, so I can only send and receive within the process I have unless I serialize and deserialize the IpcOneShotServer and send it through to the spawned process. That doesn't sound right.

patientplatypus6 commented 1 year ago

In other words IpcSender has a

pub fn connect(name: String) -> Result<IpcSender, Error> [src] [−]

Create an IpcSender connected to a previously defined IpcOneShotServer.

But IpcReciever can't connect to a previously defined server? Why not?

jdm commented 1 year ago

I think the workaround here is the following (based on this docs example):

The spawned process can now receive on r1 and the main process can send data on the new sender.

patientplatypus6 commented 1 year ago

Running into a problem. This compiles -

    let (receiver, sendervec): (IpcReceiver<Vec<(String,String)>>, Vec<_>) = server0.accept().unwrap();

But this doesn't match the type signature as given in the example

let (_, tx1): (_, IpcSender<Vec<u8>>) = server.accept().unwrap();

See here. So the step accept the new connection and resulting sender in the main process doesn't work.

In my code -

fn main() {
    println!("Inside process_handler");
    let datavec = vec![("Peter".to_string(), "36".to_string())];
    let (server0, server_name0) = IpcOneShotServer::<Vec<(String, String)>>::new().unwrap();
    println!("server_name0 in process_handler: {:?}", server_name0);
    let guiserver = spawn_server(
        "/Users/peterweyand/Code/rustprojects/project1_2/src/rungui.sh".to_string(), 
        &server_name0
    );

    let (receiver, sendervec): (IpcReceiver<Vec<(String,String)>>, Vec<_>) = server0.accept().unwrap();

    // sendervec.send(datavec); - this won't work as sendervec is of type Vec<_> and not of type IpcSender<Vec<(String, String)>>
    loop {
        std::thread::sleep_ms(1000);
    }
}
patientplatypus6 commented 1 year ago

Ok, this is close but I have an error - I'm working with the type signature of accept() as it is in the code. (If this is wrong you may want to consider updating the documentation).

In the main process

fn main() {
    println!("Inside process_handler");
    let (server0, server_name0) = IpcOneShotServer::<Vec<(String, String)>>::new().unwrap();
    println!("server_name0 in process_handler: {:?}", server_name0);
    let guiserver = spawn_server(
        "/Users/peterweyand/Code/rustprojects/project1_2/src/rungui.sh".to_string(), 
        &server_name0
    );
    let (receiver, sendervec): (IpcReceiver<Vec<(String,String)>>, Vec<_>) = server0.accept().unwrap();
    loop {
        match receiver.try_recv() {
            Ok(res) => {
                // Do something interesting with your result
                std::thread::sleep_ms(1000);
                println!("Received data...{:?}", res);
                //I don't know what type signature res is supposed to have, but it should receive the datavec. Then I can update that in main and in gui I can access via r1
                break;
            },
            Err(_) => {
                // Do something else useful while we wait
                std::thread::sleep_ms(1000);
                println!("Still waiting...");
            }
        }
    }
}

In the spawned process

fn main() {
    println!("Inside GUI process");
    let datavec = vec![("Peter".to_string(), "36".to_string())];
    let args: Vec<String> = env::args().collect();
    let (s1, r1): (IpcSender<Vec<(String, String)>>, IpcReceiver<Vec<(String, String)>>) = ipc::channel().unwrap();
    s1.send(datavec).unwrap();
    let s0 = IpcSender::connect(args[1].clone()).unwrap();
    s0.send(s1).unwrap();
    loop {
        std::thread::sleep_ms(1000);
    }
}

But I'm getting this error -

thread 'main' panicked at 'assertion failed: self.port == MACH_PORT_NULL', /Users/peterweyand/.cargo/registry/src/github.com-1ecc6299db9ec823/ipc-channel-0.16.0/src/platform/macos/mod.rs:600:9

What's going on here?

jdm commented 1 year ago

That snippet appears to be the opposite of what you were asking about. I don't actually know how the receiver that is returned from accept() is allowed to be used; it's certainly not documented anywhere.

Here's a all-in-one example of the spawned process waiting on data sent from the parent process:

use ipc_channel::ipc::{self, *};
use std::env;
use std::process::{self, Command};

pub fn spawn_server(_ignored: &str, arg: &str) -> process::Child {
    Command::new(env::current_exe().unwrap())
        .arg(arg)
        .spawn()
        .expect("failed to execute server process")
}

type Data = (String, String);

fn main() {
    if env::args().len() == 1 {
        println!("Inside process_handler");
        let (server0, server_name0) = IpcOneShotServer::<IpcSender<Data>>::new().unwrap();
        println!("server_name0 in process_handler: {:?}", server_name0);
        let guiserver = spawn_server(
            "/Users/peterweyand/Code/rustprojects/project1_2/src/rungui.sh",
            &server_name0
        );
        let (_receiver, sender): (IpcReceiver<IpcSender<Data>>, IpcSender<Data>) = server0.accept().unwrap();
        let data = ("Peter".to_string(), "36".to_string());
        sender.send(data);
        loop {
            std::thread::sleep_ms(1000);
        }
    } else {
        println!("Inside GUI process");
        let args: Vec<String> = env::args().collect();
        let (s1, r1): (IpcSender<Data>, IpcReceiver<Data>) = ipc::channel().unwrap();
        let s0 = IpcSender::connect(args[1].clone()).unwrap();
        s0.send(s1).unwrap();
        let data = r1.recv().unwrap();
        println!("received {:?}", data);
        loop {
            std::thread::sleep_ms(1000);
        }
    }
}
patientplatypus6 commented 1 year ago

Here's a close example to a trivial example of making this work. I can now send data from the top level thread to the GUI folder.

So I can define data here:

https://github.com/patientplatypus6/servoproblemexample/blob/main/src/main.rs#L20

and read it here:

https://github.com/patientplatypus6/servoproblemexample/blob/main/gui/src/main.rs#L20

However, I don't know how to do the reverse, and send data from the subfolder to the main folder. Why would I want to do this? So I have a server that spawns several processes (p1 and p2 for example) that require that they send data to the main server and then the main server can send data to the other process (so data from p1->server->p2 and p2->server->p1). Here's where I'm stuck.

    let (_receiver, sender): (IpcReceiver<IpcSender<Data>>, IpcSender<Data>) = server0.accept().unwrap();

    let data = vec![("Peter".to_string(), "36".to_string())];
    println!("value of _receiver {:?}", _receiver);
    sender.send(data);

Can send data to the IpSender from accept().unwrap() but there's no receiver section. The _receiver is a wrapped sender, so how do receive data from the IpOneShotServer?

jdm commented 1 year ago

Here's a modification that allows bidirectional sending and receiving in both processes:

use ipc_channel::ipc::{self, *};
use std::env;
use std::process::{self, Command};

pub fn spawn_server(_ignored: &str, arg: &str) -> process::Child {
    Command::new(env::current_exe().unwrap())
        .arg(arg)
        .spawn()
        .expect("failed to execute server process")
}

type Data = (String, String);
type Bootstrap = (IpcSender<Data>, IpcReceiver<Data>);

fn main() {
    if env::args().len() == 1 {
        println!("Inside process_handler");
        let (server0, server_name0) = IpcOneShotServer::<Bootstrap>::new().unwrap();
        println!("server_name0 in process_handler: {:?}", server_name0);
        let guiserver = spawn_server(
            "/Users/peterweyand/Code/rustprojects/project1_2/src/rungui.sh",
            &server_name0
        );
        let (_receiver, (sender, receiver)): (IpcReceiver<Bootstrap>, Bootstrap) = server0.accept().unwrap();
        let data = ("Peter".to_string(), "36".to_string());
        sender.send(data);
        let received = receiver.recv().unwrap();
        println!("parent received {:?}", received);
        loop {
            std::thread::sleep_ms(1000);
        }
    } else {
        println!("Inside GUI process");
        let args: Vec<String> = env::args().collect();
        let (to_child, from_parent): (IpcSender<Data>, IpcReceiver<Data>) = ipc::channel().unwrap();
        let (to_parent, from_child): (IpcSender<Data>, IpcReceiver<Data>) = ipc::channel().unwrap();
        let bootstrap = IpcSender::connect(args[1].clone()).unwrap();
        bootstrap.send((to_child, from_child)).unwrap();
        let data = from_parent.recv().unwrap();
        println!("child received {:?}", data);
        to_parent.send(("Dagne".to_string(), "8".to_string()));
        loop {
            std::thread::sleep_ms(1000);
        }
    }
}

Whatever you communicate in the initial bootstrap connection gives you the tools you need.

patientplatypus6 commented 1 year ago

Thanks, that's what I'm looking for. Consider using this as an example in the examples folder. It shouldn't take several days of back and forth with the owner of the repo to set up bidirectional communication between a server and a spawned process. That should be a minimal example so that developers can use the tool.