tokio-rs / tokio-core

I/O primitives and event loop for async I/O in Rust
Apache License 2.0
638 stars 115 forks source link

Allow try_clone on TcpStream #198

Open stackjohn opened 7 years ago

stackjohn commented 7 years ago

Is it possible to implement the try_clone method for the tokio TcpStream type?

I'm using the echo server example, eventually I am hoping to be able to iterate through all the currently connected sockets and broadcast a message to them one by one.

Lets say I have the following code:

   // Bind the server's socket
    let addr = "127.0.0.1:12345".parse().unwrap();
    let tcp = TcpListener::bind(&addr, &handle).unwrap();

    let mut connections = Arc::new((Mutex::new(Vec::new())));

    let conn = connections.clone(); 
    thread::spawn(move || {
        let mut i;
        loop {
            i = conn.lock().unwrap().len();
            //Here I am hoping to eventually be able to write to each one of the sockets in the vector. 
            println!("There are {} connections", i);
             stdout().flush();
            thread::sleep_ms(10000);
        }
    });

    // Iterate incoming connections
    let server = tcp.incoming().for_each(|(tcp, _)| {

        let connection = Arc::new(Mutex::new(tcp));   

        connections.lock().unwrap().push(connection.lock().try_clone());

        // Split up the read and write halves
        let (reader, writer) = connection.lock().unwrap().split();

        // Future of the copy
        let bytes_copied = io::copy(reader, writer);

        // ... after which we'll print what happened
        let handle_conn = bytes_copied.map(|(n, _, _)| {
            println!("wrote {} bytes", n)
        }).map_err(|err| {
            println!("IO error {:?}", err)
        });

        // Spawn the future as a concurrent task
        handle.spawn(handle_conn);

        Ok(())
    });

The code fails to compile as try_clone is not implemented.

        connections.lock().unwrap().push(connection.lock().try_clone());

It appears that TcpStream in the standard library does allow the cloning of TcpStream objects.

Is there a way to work around this? Apologies I am very new to Rust, so what I am trying to do might not make any sense at all.

alexcrichton commented 7 years ago

Thanks for the report! Unfortunately this is a pretty difficult notion on the backend, however, as the semantics of duplicate file descriptors and epoll notifications tends to get... interesting. (much less the Windows semantics as well). I don't think it's impossible but I don't think it's easy either, and we currently haven't put a huge amount of thought into how to do this.

For now could you use something like Arc<TcpStream>? That way you can read/write and still clone it to get multiple references.

stackjohn commented 7 years ago

Thanks for your reply, much appreciated. I will take a look at Arc<TcpStream>.

I see there is a good example here.

carllerche commented 7 years ago

Relates to https://github.com/carllerche/mio/issues/361

pavlix commented 3 years ago

A few notes…

Arc<TcpStream> is not usable for reader/writer clones passed to concurrent async blocks.

TcpStream::split() is not usable either because reader and writer are not independent and cannot be both moved each to its own async block;

The use case of the standard TcpStream::try_clone() is to create two completely independent channels (the dependency is moved to the operating system internals) for reading and writing. The provided example doesn't perform concurrent operation.

However, there is TcpStream::into_split() (that possibly didn't exist when @stackjohn asked) that does the job. It returns a reader/writer pair that can be moved and split into two separate tasks. It looks like it's not an OS-level solution based on dup() but rather the TcpStream seems to be moved to a dynamically allocated memory and shared by the reader/writer pair.

ramiroaisen commented 1 year ago

You can create a tokio broadcast channel and subscribe to it in the TcpStream handler fn, then when you get new messages you send them back to the TcpStream, no need to clone it.

SofwanCoder commented 6 months ago

Another good option is to convert the tokio TcpStream into std TcpStream using TcpStream::into_std() then clone the std TcpStream using it's TcpStream::try_clone() method.

Then you can eventually do TcpStream::from_std


// 
let std_tcp_stream = tokio_tcp_stream.into_std()?;
let std_tcp_stream_clone_one =  std_tcp_stream.try_clone()?;
let std_tcp_stream_clone_two = std_tcp_stream.try_clone()?;

// Both pointing to the same file descriptors
let new_tokio_tcp_stream _one= TcpStream::from_std(std_tcp_stream_clone_one)?;
let new_tokio_tcp_stream _two= TcpStream::from_std(std_tcp_stream_clone_two)?;
QyInvoLing commented 6 months ago

Another good option is to convert the tokio TcpStream into std TcpStream using TcpStream::into_std() then clone the std TcpStream using it's TcpStream::try_clone() method.

Then you can eventually do TcpStream::from_std

// 
let std_tcp_stream = tokio_tcp_stream.into_std()?;
let std_tcp_stream_clone_one =  std_tcp_stream.try_clone()?;
let std_tcp_stream_clone_two = std_tcp_stream.try_clone()?;

// Both pointing to the same file descriptors
let new_tokio_tcp_stream _one= TcpStream::from_std(std_tcp_stream_clone_one)?;
let new_tokio_tcp_stream _two= TcpStream::from_std(std_tcp_stream_clone_two)?;

This method takes the ownership of the origin tokio's tcpstream object, which can cause problem somewhere (for example, the stream itself is a field of another struct)