rustasync / team

A point of coordination for all things Rust and Async
https://rustasync.github.io/team/
MIT License
224 stars 29 forks source link

How do we multicast using Rust's net? #81

Closed nbro closed 5 years ago

nbro commented 5 years ago

I'm using rustc 1.30.1. I am doing everything on the same machine.

Rust's net provides a way to a join a multicast group, using join_multicast_v4. But I am unable to create an example, using just net, where I am able to multicast, that is, send a message to a multicast group B, and all sockets associated with B receive the message.

Here's the code I am using to try to implement multicasting in Rust using just net.

use std::net::Ipv4Addr;
use std::net::UdpSocket;
use std::str::FromStr;
use std::thread;
use std::time::Duration;

fn main() {
    // Host of group A.
    let proposers_host = "239.0.0.1";

    // Host of group B.
    let acceptors_host = "239.0.0.1";

    let acceptors_address = "239.0.0.1:7000";

    let join_handle1: thread::JoinHandle<_> = thread::spawn(move || {
        println!("{}", "Spawning 1st thread");

        let socket = UdpSocket::bind("0.0.0.0:0").expect("Couldn't bind proposer UDP socket");

        // Does NOT work even if I uncomment the following.
        // socket
        //     .set_multicast_loop_v4(true)
        //     .expect("Could not enable multicast loop, to send packets back to the local socket");

        // This socket joined this group. Let's call this group A.
        socket
            .join_multicast_v4(
                &Ipv4Addr::from_str(proposers_host).unwrap(),
                &Ipv4Addr::UNSPECIFIED,
            ).expect("Could not join multicast group A");

        for i in 1..10 {
            // Send message to the acceptors that joined the multicast group B.
            socket
                .send_to(&[i], acceptors_address)
                .expect("couldn't send data");
            println!("Sent message: {:?}\n---\n", i);

            thread::sleep(Duration::from_millis(1));
        }
    });

    let join_handle2: thread::JoinHandle<_> = thread::spawn(move || {
        println!("{}", "Spawning 2nd thread");

        let socket = UdpSocket::bind("0.0.0.0:0").expect("Could not bind acceptor 1 UDP socket");

        // Joining group B.
        socket
            .join_multicast_v4(
                &Ipv4Addr::from_str(acceptors_host).unwrap(),
                &Ipv4Addr::UNSPECIFIED,
            ).expect("Could not join multicast group B");

        let mut buf = [0; 10];
        let mut c = 0;
        loop {
            let (number_of_bytes, src_addr) =
                socket.recv_from(&mut buf).expect("Didn't receive data");

            let filled_buf = &mut buf[..number_of_bytes];

            println!("I am the 2nd socket");
            println!("Message received from address = {:?}", src_addr);
            println!("Contents of the message = {:?}\n---\n", filled_buf);

            thread::sleep(Duration::from_millis(1));

            c += 1;
            if c == 5 {
                break;
            }
        }
    });

    let join_handle3: thread::JoinHandle<_> = thread::spawn(move || {
        println!("{}", "Spawning 3rd thread");

        let socket = UdpSocket::bind("0.0.0.0:0").expect("Could not bind acceptor 2 UDP socket");

        socket
            .join_multicast_v4(
                &Ipv4Addr::from_str(acceptors_host).unwrap(),
                &Ipv4Addr::UNSPECIFIED,
            ).expect("Could not join multicast group B");

        let mut buf = [0; 10];
        loop {
            let (number_of_bytes, src_addr) =
                socket.recv_from(&mut buf).expect("Didn't receive data");

            let filled_buf = &mut buf[..number_of_bytes];

            println!("I am the 3rd socket");
            println!("Message received from address = {:?}", src_addr);
            println!("Contents of the message = {:?}\n---\n", filled_buf);

            thread::sleep(Duration::from_millis(1));
        }
    });

    println!("{}", "At the end");

    join_handle1.join().unwrap();
    join_handle2.join().unwrap();
    join_handle3.join().unwrap();
}

It's not clear to me why Rust's net provides a method join_multicast_v4, but the example above does not work as expected, i.e. the "acceptors" do not receive anything!

I can't bind two sockets to the same port (at least, locally, and I am doing this locally, but eventually acceptors could be in different machines). If I bind socket 2 and 3 to the same port 7000, one of the acceptors (either 2 or 3) receives the messages, but the other panics because I can't bind two sockets to the same port. You can test this behaviour by changing the lines above (in the 2nd and 3rd socket)

let socket = UdpSocket::bind("0.0.0.0:0").expect("...");

to, e.g.,

let socket = UdpSocket::bind("0.0.0.0:7000").expect("...");

And you will get a panic, as I just explained.

So, how do I multicast using Rust's net? Can you please provide a simple example where there are multiple receivers (or acceptors) that receive the message sent by the same sender?

yoshuawuyts commented 5 years ago

@nbro my understanding is that each listener needs to be bound on a separate port, and you can use the send_to() method to send data to another port. By design only one listener can listen on each port.

edit: @nbro also consider submitting this as a question on StackOverflow, where there are more people that might answer your question!

nbro commented 5 years ago

@yoshuawuyts But if I need to explicitly send to each listener than it's not a multicast operation, but simply a unicast to each receiver/listener.

Nemo157 commented 5 years ago

Multicast is normally about sending to multiple hosts or interfaces, not sending to multiple listeners in the same process. The simplest example would be to take your acceptor thread with the correct port specified, run that on two devices on the same subnet, use a 224 address instead of 239 so that your packets will make it through other network equipment between the devices (at least, my understanding of IPv4 multicast address ranges is that supporting 224 is mandatory while 239 is more an address range that you need to configure the behaviour of on your switches), then run your proposer code on one of the devices.

If you really want to have multiple listeners on the same device, whether in the same process or not, then you'll need to use something like SO_REUSEPORT to allow multiple listeners bound to the same port on the same interface. That gets into the non-portable network APIs that std doesn't provide (and I have no experience with).

You will almost certainly have a higher chance of getting useful answers on StackOverflow, doing something like this is pretty niche so having a wider audience of potential answerers would really help. You could also look into the same thing done in C with the POSIX APIs, in terms of configuring the UdpSocket for this behaviour what Rust is providing you is almost identical.

nbro commented 5 years ago

@Nemo157

Multicast is normally about sending to multiple hosts or interfaces, not sending to multiple listeners in the same process.

Yes, but how do you test your program locally? You may not have access to several devices and you still want to perform multicasting.

If you really want to have multiple listeners on the same device, whether in the same process or not, then you'll need to use something like SO_REUSEPORT to allow multiple listeners bound to the same port on the same interface.

But net2 provides such an option, using the method reuse_address. I was actually able to create the multicast example I was looking for by using reuse_address from net2, but I need to include another crate only for this.

Why isn't reuse_address yet incorporated in net? Programming languages, such as Python, provide such an option.

That gets into the non-portable network APIs that std doesn't provide (and I have no experience with).

Why is it non-portable? Why do languages like Python provide such an option?

Nemo157 commented 5 years ago

See this stackoverflow answer for the portability issues, I don't think reuse_address is guaranteed to work for your usecase on all operating systems.

yoshuawuyts commented 5 years ago

Did some research on UDP multicast in Rust, and I believe we might need to RFC socket builders for std. Would appreciate if people could review the notes.

https://paper.dropbox.com/doc/2019-03-26-Notes-on-Socket-Reuse-in-Std--AaAi9D5s78ZO3Q~gyVLz0qvcAg-fPOVM2WAjbvQepV05MTMJ

I'm currently confirming with the libs team that this is indeed correct.