craftytrickster / stubborn-io

io traits/structs for tokio that automatically recover from potential disconnections/interruptions
https://docs.rs/stubborn-io
MIT License
63 stars 16 forks source link

Prevent Deref confusion #33

Open half2me opened 1 week ago

half2me commented 1 week ago

I can't use the provided tcp stream wrapper since i need to send some custom data immediately after reconnection happens.

I tried to follow the examples and implement the trait:

use std::error::Error;
use std::io;
use std::future::Future;
use std::pin::Pin;
use stubborn_io::tokio::{StubbornIo, UnderlyingIo};
use tokio::net::TcpStream;
use tokio::io::{AsyncWriteExt};
use derived_deref::{Deref, DerefMut};
use tokio::time::{Duration, sleep};

#[derive(Deref, DerefMut)]
struct DurableTCPStream(TcpStream);

impl UnderlyingIo<String> for DurableTCPStream {
    fn establish(addr: String) -> Pin<Box<dyn Future<Output = io::Result<Self>> + Send>> {
        Box::pin(async move {
            let parts: Vec<&str> = addr.split('/').collect();
            println!("connecting to {}", parts[0]);
            let mut stream = TcpStream::connect(parts[0]).await?;

            if parts.len() > 1 {
                // hello message was specified, use it
                println!("sending login: {}", parts[1]);
                stream.write_all(format!("{}\n", parts[1]).as_ref()).await?;
            }
            Ok(DurableTCPStream(stream))
        })
    }
}

type StubbornTCP = StubbornIo<DurableTCPStream, String>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut stream = StubbornTCP::connect(String::from("localhost:9999/logincmd")).await?;

    loop {
        println!("sending msg");
        match stream.write_all(b"hello world!\n").await {
            Ok(_) => (),
            Err(e) => {
                println!("{}", e);
            }
        }
        sleep(Duration::from_millis(1000)).await;
    }
}

When running with the debugger it seems like calling write_all (I've also tried with write) directly calls the underlying poll_write() completely bypassing the retry logic.

What am I missing?

I've put a breakpoint on poll_write but it never gets called.

Screenshot of debugger
craftytrickster commented 1 week ago

Thank you for reporting this issue. I will try to look at it tomorrow night, when I believe I may have time.

junderw commented 1 week ago

Someone answered on reddit.

I think this issue can be closed.

u/ToTheBatmobileGuy 25m ago

It's Deref-ing into the normal TCPStream, but if you re-write it like this:

<StubbornTCP as AsyncWriteExt>::write_all(&mut stream, b"hello world!\n").await And you'll see the issue:

error[E0277]: the trait bound `DurableTCPStream: AsyncWrite` is not satisfied
--> src/main.rs:39:16
|
39  |         match <StubbornTCP as AsyncWriteExt>::write_all(&mut stream, b"hello world!\n").await {
|                ^^^^^^^^^^^ the trait `AsyncWrite` is not implemented for `DurableTCPStream`, which is required by `StubbornIo<DurableTCPStream, String>: AsyncWrite`

StubbornIO only impls AsyncWrite when the underlying T does:

impl<T, C> AsyncWrite for StubbornIo<T, C>
where
    T: UnderlyingIo<C> + AsyncWrite,
    C: Clone + Send + Unpin + 'static,
{

So you need to actually implement (DerefMut won't cut it) AsyncWrite for your wrapper. ie.

impl AsyncWrite for DurableTCPStream {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> std::task::Poll<Result<usize, io::Error>> {
        <TcpStream as AsyncWrite>::poll_write(Pin::new(&mut self.0), cx, buf)
    }

    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), io::Error>> {
        <TcpStream as AsyncWrite>::poll_flush(Pin::new(&mut self.0), cx)
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), io::Error>> {
        <TcpStream as AsyncWrite>::poll_shutdown(Pin::new(&mut self.0), cx)
    }

    fn poll_write_vectored(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        bufs: &[io::IoSlice<'_>],
    ) -> std::task::Poll<Result<usize, io::Error>> {
        <TcpStream as AsyncWrite>::poll_write_vectored(Pin::new(&mut self.0), cx, bufs)
    }

    fn is_write_vectored(&self) -> bool {
        <TcpStream as AsyncWrite>::is_write_vectored(&self.0)
    }
}
craftytrickster commented 1 week ago

Thank you, @junderw

junderw commented 1 week ago

Just an idea: Since the whole point of this wrapping type is to use the AsyncWrite impl that does retries, and that impl requires certain impls to impl (English is hard lol)

Then it might make sense to place those trait bounds on the struct itself just to prevent these kinds of errors in the future, so that people will get an error as soon as they try to name the type with T and C that won't ever utilize the AsyncWrite impl in this library.

craftytrickster commented 1 week ago

Just an idea: Since the whole point of this wrapping type is to use the AsyncWrite impl that does retries, and that impl requires certain impls to impl (English is hard lol)

Then it might make sense to place those trait bounds on the struct itself just to prevent these kinds of errors in the future, so that people will get an error as soon as they try to name the type with T and C that won't ever utilize the AsyncWrite impl in this library.

This may be worth considering. I have thought about this in the past, due to the confusion that the deref might cause in certain cases. I will keep this ticket issue open as reference, but in the immediate present, I do not actively have a plan to change the API

junderw commented 1 week ago

Might want to re-name the issue to "Prevent Deref confusion." or something.

Maybe this can all be avoided with removing the Deref(Mut) impl and using AsRef and AsMut instead.

It might add a few .as_mut() here and there for the consumer, but it's better than having weird magical bugs like this (where they try it by hand a few times and don't run into a disconnect, deploy, and then realize it's not reconnecting at all after prod deploy)

craftytrickster commented 1 week ago

Similar previous issue - https://github.com/craftytrickster/stubborn-io/issues/22