scottlamb / retina

High-level RTSP multimedia streaming library, in Rust
https://crates.io/crates/retina
Apache License 2.0
220 stars 46 forks source link

custom TCP/UDP transporter support #7

Open lattice0 opened 2 years ago

lattice0 commented 2 years ago

I'm integrating retina in my project right now and there's one crucial thing I'll need: a custom TCP/UDP transporter.

Rust's hyper HTTP crate supports a custom TCP transporter though something that implements AsyncWrite/AsyncRead. Can I make a PR to add support for this? I did not read yet how you make the TCP calls but would be nice to have the same support as hyper. I don't know if your library relies on tokio streams. If not, that could be difficult. Do you have suggestions?

Here's how I did on my client: https://github.com/lattice0/rust_rtsp_client/blob/489491ee70f6432dca801c49053d55b13264524e/src/client.rs#L181

Here's a test I did on hyper using a custom transporter: https://github.com/lattice0/rust_hyper_custom_transporter/blob/master/src/custom_req.rs

About UDP, tokio has no traits for it so we'd have to think about how to do it.

scottlamb commented 2 years ago

Yeah, a PR like that would be welcome. Roughly I'd expect for TCP the following would work:

UDP isn't supported at all yet.

lattice0 commented 2 years ago

ok, first for the describe signature. Could it be something like this:

pub async fn describe_with_custom_transport(url: Url, creds: Option<Credentials>, write: tokio::io::AsyncWrite + Unpin + Send + 'static, read: tokio::io::AsyncRead + Unpin + Send + 'static) -> Result<Self, Error> {

or maybe a struct that holds both of the write/read?

Remember that we also need to provide a mean of reconnection, as AsyncWrite and AsyncRead are formed from a socket that could close. So maybe we also provide a function that returns a new pair of write: tokio::io::AsyncWrite + Unpin + Send + 'static, read: tokio::io::AsyncRead + Unpin + Send + 'static?

I'm in doubt if we should pass an already connected read/write pair or if the RtspConnection should be responsible for connecting.

About ConnectionContext, maybe we can create a struct/trait also hold the reconnect function and the read/write streams, and also holds a ConnectionContext?

Something like this:

struct CustomTransporter {
    //these are Option so we can pass streams that are already connected or not 
    read: Option<dyn tokio::io::AsyncRead + Unpin + Send + 'static`>,
    write: Option<dyn tokio::io::AsyncWrite + Unpin + Send + 'static>,
    reconnect: Box<dyn Fn() -> Result<CustomTransporter, (?)> + Send>,
    context: ConnectionContext
}

and then

pub async fn describe_with_custom_transport(url: Url, creds: Option<Credentials>, custom_transporter: CustomTransporter) -> Result<Self, Error> {

scottlamb commented 2 years ago

May I ask what you're doing that you have a separate write and read? I was imagining something like a SOCKS proxy, but with that I'd expect them to be bundled.

Remember that we also need to provide a mean of reconnection, as AsyncWrite and AsyncRead are formed from a socket that could close.

I'm a little confused. Last time we talked IIRC you were focused on interleaved channels. I don't think it's necessary to reopen the socket there—tearing down the RTSP connection also destroys the session. I'd rather retry at a higher level than have the session object try to re-establish itself. Did you decide to switch to RTP/UDP instead?

I'm open to this library supporting UDP eventually but haven't prioritized it or put any real thought into a UDP-friendly API yet.

lattice0 commented 2 years ago

Ok, no reconnection needed, I guess leaving it to the RTSP client in a higher level is ok.

The separate write and read is because of the traits AsyncWrite and AsyncRead. At least hyper does this way so I thought it would be a good idea to stick with the same interface. Do you know another good tokio trait that supports both write and read at the same time?

scottlamb commented 2 years ago

There are two traits, but hyper seems to expect them to be implemented by a single type and supplied as a single value. Eg look at async-std-hyper, an example of how to use hyper from the async-std runtime. It defines a HyperStream which implements both AsyncRead and AsyncWrite. There's only one of them per connection. I think in general you have one value that does both things and it's more work to split them apart (and may need a mutex or something to prove to the borrow checker that doing so is valid).

So I was thinking a type parameter like S: AsyncRead + AsyncWrite.

lattice0 commented 2 years ago

Ok, what about

trait AsyncRW: tokio::io::AsyncRead + tokio::io::AsyncWrite {} 

impl<T> AsyncRW for T
where
    T: AsyncRead + AsyncWrite
{
}

struct CustomTransporter {
    socket: Box<dyn AsyncRW>,
    context: ConnectionContext
}

pub async fn describe_with_custom_transport(url: Url, creds: Option<Credentials>, custom_transporter: CustomTransporter) -> Result<Self, Error>;

then?

scottlamb commented 2 years ago

Sorry, I missed your trait proposal quite a while ago.

Is this something you're still looking for? Could you help me understand the use case / motivation? It adds a bit of complexity, particularly if we also support UDP transport. Do you need that also, or just TCP?

Re: your trait proposal above:

trait TcpOpener {
    fn open(&self, url: &Url) -> Result<Box<dyn TcpConnection>, BoxedError>;
}

trait TcpConnection : tokio::io::AsyncRead + tokio::io::AsyncWrite {
    fn ctx(&self) -> &ConnectionContext;

    /// Opens UDP sockets for communicating with the peer.
    ///
    /// The new sockets must be on adjacent port numbers, returned by `ctx`.
    fn open_udp_sockets(&mut self) -> Result<Box<dyn UdpSockets>, BoxedError>;
}

struct UdpInterest; // ... bitmask of (rtp, rtcp) X (send, receive)
enum UdpSocket { Rtp, Rtcp }

/// Type for `try_recv_*` to use, analogous to `std::io::IoSliceMut`.
///
/// On *nix platforms, should wrap `libc::msghdr` as follows.
#[repr(transparent)]
struct MessageMut<'a> {
    raw: libc::msghdr, // should have unsafe accessor; filling incorrectly is unsound.
    marker: PhantomData<'a ()>,
}

/// Likewise for `try_send_*`, analogous to `std::io::IoSlice`.
#[repr(transparent)]
struct Message<'a> { /* similar */ }

/// Represents RTP and RTCP sockets for a single RTSP stream (aka RTP session) using UDP transport.
///
/// Closes on drop.
trait UdpSockets {
    fn connect(&mut self, server_rtp_socket: std::net::SocketAddr) -> std::io::Result<()>;

    /// Returns the context for this stream.
    ///
    /// Initially, this can be used to obtain the RTP and RTCP port numbers,
    /// with placeholder values for the server. The server values are filled on `connect`.
    fn ctx(&self) -> &StreamContext;

    fn poll_ready(&self, interest: UdpInterest, cx: &mut Context<'_>) -> Poll<Result<UdpInterest>>;

    /// Hint for maximum number of messages handled by a single call to `try_{recv,send}_*`.
    ///
    /// This allows Retina to avoid preparing messages that won't be handled, if it's known that
    /// the platform doesn't support `recvmmsg` and `sendmmsg`.
    fn max_msgs_hint(&self) -> usize { 1 }

    /// Receives messages via `libc::readmmsg` if available, or single-message version otherwise.
    ///
    /// Returns `std::io::ErrorKind::WouldBlock` if not ready.
    fn try_recv_rtp(&self, msgs: &mut [MessageMut]) -> std::io::Result<usize>;
    fn try_recv_rtcp(&self, msgs: &mut [MessageMut]) -> std::io::Result<usize>;

    /// Sends messages via `libc::sendmmsg` if available, or single-message version otherwise.
    ///
    /// Returns `std::io::ErrorKind::WouldBlock` if not ready.
    fn try_send_rtp(&self, msgs: &mut [Message]) -> std::io::Result<usize>;
    fn try_send_rtcp(&self, msgs: &mut [Message]) -> std::io::Result<usize>;
}
lattice0 commented 2 years ago

I definetly still need it for my project. I want to send/receive TCP/UDP packets over openvpn (https://github.com/lattice0/true_libopenvpn3_rust). Everything is ready, but of course retina must be ready to send things over a custom userspace socket. I need UDP also.

It looks like your draft is nice, I just don't have time to try to implement it right now, my app has become a free-time job.

scottlamb commented 2 years ago

Userspace VPN on Android: cool!

lattice0 commented 2 years ago

Took me some months of research, then getting to understand openvpn3 (gosh, it's .hpp only) and making a client, then learn rust to integrate a tcp/ip stack (the C++ ones are shit, thankfully we have smoltcp) then integrating everything into rust, but finally it's done, so not only we can do userspace vpn on Android, but you can connect to multiple VPNs at the same time, something impossible on Android at the system level (as far as I know). Of course is going to work on windows and macOS and iOS, but that's for later.

So, would be nice to have a userspace tcp/ip interface on retina 🥰👍

-------- Mensagem Original -------- Ativo 11 de mai. de 2022 18:46, Scott Lamb escreveu:

Userspace VPN on Android: cool!

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>