craftytrickster / stubborn-io

io traits/structs for tokio that automatically recover from potential disconnections/interruptions
https://docs.rs/stubborn-io
MIT License
68 stars 17 forks source link

StubbornTcpStream in tokio::task::spawn_blocking > rt::System::new().block_on(async... #28

Closed immerhart closed 8 months ago

immerhart commented 9 months ago

Hi, thanks for this handy lib. I'm using the StubbornTcpStream in a block close to this:

tokio::task::spawn_blocking(move || {
    rt::System::new().block_on(async {
        let mut ro = ReconnectOptions::new();
        ro.exit_if_first_connect_fails = false;
        ro.on_connect_callback = Box::new(connect_done);
        ro.on_disconnect_callback = Box::new(on_disconnect);
        ro.on_connect_fail_callback = Box::new(on_connect_fail);

        let mut stream = StubbornTcpStream::connect_with_options("127.0.0.1:6379", ro)
            .await
            .unwrap();

        let (reader, writer) = stream.split();
        let mut buf_writer = BufWriter::new(writer);
        let mut buf_reader = BufReader::new(reader);

        // rx is mpsc::Receiver
        while let Some(cmd) = rx.recv().await {
           ....
        }
    });
});

and for some reason, if I shutdown the other server sock, the disconnect is not detected and no reconnect attempts seem to happen. I assume, something is in a blocking state (in the while clause) and I need to do some "event processing" call. Or is it some issue with the actix-web runtime? Any idea? Thanks.

immerhart commented 9 months ago

Note, same behavior if just a task is used, like this:

tokio::task::spawn(async move {
    let mut ro = ReconnectOptions::new();
    ro.exit_if_first_connect_fails = false;
    ....
});
craftytrickster commented 9 months ago

I will take a look into this and see if I can find the issue. In general, blocking on tasks in Rust/tokio in general can often be problematic.

immerhart commented 9 months ago

Thanks! Blocking was not the right phrase I guess, in the while loop it's actually doing a rx.recv().await

craftytrickster commented 9 months ago

Unfortunately, I need to see more of your code in order to determine what is going on. Do you have a branch anywhere that I can clone?

immerhart commented 9 months ago

actix-stubb.zip I did a quick example. If you have a second process which binds on 127.0.0.1:6379 you can test it like this, start server, start client, close server. No reconnect happening and also no info that the connection was closed. Pretty new to rust, so maybe I'm doing smth wrong...

craftytrickster commented 9 months ago

Okay, no promises, but if I can find some free time I will take a look at the zip contents and see if I can reproduce.

craftytrickster commented 9 months ago

One quick way you can reproduce is to replace StubbornTcp with the regular tokio one, and see if it is aware that it closed. If the regular tokio one is not even aware, then you know it's something unrelated to StubbornTcp

craftytrickster commented 8 months ago

Did you ever try what I proposed in the comment above?

immerhart commented 8 months ago

Not really, as I'm unsure how to test it...

immerhart commented 8 months ago

Could it be related to not directly using AsyncWrite/Read? I'm using tokio::io::BufReader/BufWriter...

craftytrickster commented 8 months ago

I did not look at your code yet as I was waiting for your response first, but now that you mention it, if you are using https://docs.rs/tokio/latest/tokio/io/trait.AsyncBufRead.html#tymethod.poll_fill_buf , then I can see why this might fail. I need to look at that trait (AsyncReadBuf) too perhaps, since I only have regular AsyncRead/Write in my code currently.

craftytrickster commented 8 months ago

I looked at your example:

    let (reader, writer) = stream.split();
    let mut buf_writer = BufWriter::new(writer);
    let mut buf_reader = BufReader::new(reader);

    while let Some(cmd) = rx.recv().await {
        match cmd {
            ExecuterCommand::Request {
                resp: out_resp,
                req: _,
            } => {
                buf_writer.write_u16(123).await.unwrap();
                buf_writer.flush().await.unwrap();

                let _ = out_resp.send(ExecuterResp {
                    id: "data.id".to_string(),
                });
            }
            ExecuterCommand::Close {} => {
                tracing::info!("close request");
                break;
            }
        }
    }

The Read portion of your connection is never used, other than being wrapped, which only leaves the write portion. When your rx.recv() loop receives a new cmd, if the TcpConnection was indeed severed, I would expect the StubbornTcp to become aware of the fact when you first try to write the u16, since at that point in time, it should try a reconnect.

However, if a disconnect just occurs and you are currently not receiving any new commands in the rx.recv loop, there would be no way for this library to be aware, since the Read portion of this isn't being used. For the read portion of this to be activated, you would have to maybe do something like (prior to your while loop):

tokio::spawn(async move {
    let mut buf = vec![];
    while !buf_reader.read(&mut buf).await.is_err() {
        println!("Check out my buf!: {:?}", buf);
    }
});

I am not sure what messages (if any) are returned by the endpoint you are connected to.

craftytrickster commented 8 months ago

I will close issue due to my explanation above

immerhart commented 8 months ago

My bad, I was very bussy (kids...) and forget to answer your kind reply, sorry for that... I just kept the read part out, as it seemed not relevant (issue also appeared with the read part in) and simplified the uploaded code... If I will have time to work again on it, I'll recheck if it's related to not receive commands in the first place, thanks once again.