libp2p / rust-yamux

Multiplexer over reliable, ordered connections.
Apache License 2.0
179 stars 43 forks source link

What is the idiomatic way to create multiple streams? #183

Closed caelansar closed 10 months ago

caelansar commented 10 months ago

Hi! I recently tried to upgrade yamux to the latest version and I want to provide a method to create a stream from a connection in my project

In yamux 0.10.2, I can use

pub struct YamuxCtrl<S> {
    ctrl: Control,
    _conn: PhantomData<S>,
}

impl<S> YamuxCtrl<S>
where
    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
    pub fn new_client(stream: S) -> Self {
        let conn = Connection::new(stream.compat(), Config::default(), Mode::Client);

        let ctrl = conn.control();

        tokio::spawn(yamux::into_stream(conn).try_for_each_concurrent(None, |_stream| future::ready(Ok(()))));

        Self {
            ctrl,
            _conn: marker::PhantomData,
        }
    }
    async fn open_stream(&mut self) -> Result<ClientStream<Compat<yamux::Stream>>, Error> {
        let stream = self.ctrl.open_stream().await?;
        Ok(ClientStream::new(stream.compat()))
    }
}

I can use open_stream to obtain a stream, but how can I do this in yamux 0.13.1 since Control has been removed and we need call Connection::poll_next_inbound repeatedly in order to make progress

I found some examples in tests, it looks like:

        let mut stream = future::poll_fn(|cx| client.poll_new_outbound(cx))
            .await
            .unwrap();
        task::spawn(noop_server(stream::poll_fn(move |cx| {
            client.poll_next_inbound(cx)
        })));

        stream.write_all(&msg.0).await?;
        stream.close().await?;    }

but since the ownership of client has already been moved into task::spawn, how do I then obtain a stream from this connection afterward?

thomaseizinger commented 10 months ago

You have two options:

caelansar commented 10 months ago

You have two options:

  • Create an event loop and communicate with it via channels (send incoming streams out of the loop and request new streams by sending messages into it)
  • Compose your application logic directly with a loop that continuously polls the Connection

@thomaseizinger Thanks! I create a loop in new_client method and send message in open_stream via a channel to get a stream. Now it works as I expected