actix / actix

Actor framework for Rust.
Apache License 2.0
8.66k stars 652 forks source link

Cancelling a `notify_later()` future sometimes fails, eventually delivering the cancelled message. #284

Open vincentdephily opened 5 years ago

vincentdephily commented 5 years ago

This is happening pretty reliably in my code, once enough messages are scheduled :

There shouldn't be a race condition, as the timeout arrives many seconds after it was supposedly cancelled. I've triple-checked that I do indeed cancel the right spawnhandle. I've tried to create a reduced testcase that I can share, but didn't succeed. I don't think it's the same as issue #206 as there's no UDP socket involved (there is TCP though).

This is a problem both because of the unexpected timeout that leads to spurious handling, and because the Actor remains alive while waiting for the message.

ousado commented 5 years ago

A testcase, even when it fails to demonstrate the problem, can still be useful to illustrate the overall setup.

vincentdephily commented 5 years ago

Finally managed to reproduce with a reduced test-case, by bringing in a TCP connection. So it actually might be the same bug as #206, it's just not tied specificly to UDP. Maybe it's not even tied to a network connection, and having any kind of stream handling would be the same.

Run a local echo server on port 6142 with for example nc -l -p 6142 -e /bin/cat, and then cargo run the code below. You'll see an MsgTimeout being triggered after 500ms, despite the corresponding spawnhandle having been cancelled a couple of ms after having been scheduled.

Using Linux 64bit, actix v0.8.3, tokio 0.1.22, rustc 1.37.

//[package]
//name = "actix_undead_notify"
//version = "0.1.0"
//edition = "2018"
//
//[dependencies]
//actix = { version = "~0.8", default-features = false }
//time = "0.1.42"
//tokio = "0.1.22"
//bytes = "0.4.12"

use actix::{fut::wrap_future, prelude::*};
use bytes::BytesMut;
use std::{collections::BTreeMap,
          convert::TryInto,
          fmt::Display,
          io::Write,
          sync::atomic::{AtomicU64, Ordering},
          thread::sleep,
          time::{Duration, Instant}};
use tokio::{codec::{Decoder, FramedRead},
            io::{AsyncRead, WriteHalf},
            net::TcpStream};

static MSGID: AtomicU64 = AtomicU64::new(0);
/// Request sent to the server, including an id and a timeout. Server should just reply with the
/// same bytes. Use netcat as your server: `while nc -l -p 6142 -e /bin/cat; do sleep 1;done`.
#[derive(Debug, Message)]
struct MsgRequest(u64, u64);
impl MsgRequest {
    fn new() -> Self {
        MsgRequest(MSGID.fetch_add(1, Ordering::AcqRel), 500)
    }
}

/// Keep track of the pending requests by sending ourselves an MsgTimeout after a delay, and
/// cancelling the corresponding spawnahndle if we get the reply in time.
#[derive(Debug, Message)]
struct MsgTimeout(u64);

/// Inbound data type and associated codec.
#[derive(Debug, Message)]
struct MsgTcpIn(u64);
struct U64Codec;
impl Decoder for U64Codec {
    type Item = MsgTcpIn;
    type Error = std::io::Error;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 8 {
            return Ok(None);
        }
        Ok(Some(MsgTcpIn(u64::from_le_bytes(src.split_to(8).as_ref().try_into().unwrap()))))
    }
}

/// Forcibly shut down the system.
#[derive(Debug, Message)]
struct MsgShutdown;

/// Format time.
fn now() -> String {
    let t = time::now_utc();
    format!("{}.{:09.09}Z", t.strftime("%FT%T").unwrap(), t.tm_nsec)
}

/// Stop the system with a message.
fn system_stop(c: i32, s: impl Display) {
    if c == 0 {
        println!("{} Normal system stop", now());
        System::current().stop();
    } else {
        println!("{} !!!!!!!!!!!!!!!! system stop: {}", now(), s);
        System::current().stop_with_code(c);
    }
}

struct Client {
    /// Socket to server
    writer: Option<WriteHalf<TcpStream>>,
    /// Handles to pending MsgTimeouts
    pending: BTreeMap<u64, SpawnHandle>,
    /// Stop the system after a while
    watchdog: Addr<Watchdog>,
}
impl Client {
    fn new(watchdog: Addr<Watchdog>) -> Self {
        Self { writer: None, pending: BTreeMap::new(), watchdog }
    }
}
impl Actor for Client {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Context<Self>) {
        println!("{} client starting", now());
        let con = wrap_future(TcpStream::connect(&"127.0.0.1:6142".parse().unwrap())).map(
            |s: TcpStream, a: &mut Client, c| {
                println!("{} connected", now());
                let (r, w) = s.split();
                a.writer = Some(w);
                c.add_stream(FramedRead::new(r, U64Codec{}));
            },
        ).map_err(|e,_a,_c|{
            system_stop(1, format!("network error: {}", e))
        });
        ctx.spawn(con);
    }
    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        println!("{} client stopping {:?}", now(), self.pending);
        Running::Stop
    }
    fn stopped(&mut self, _ctx: &mut Context<Self>) {
        println!("{} client stopped {:?}", now(), self.pending);
        if !self.pending.is_empty() {
            system_stop(1, "still have some reqs pending")
        }
    }
}
/// Send request {msg.0} to server and expect a reply within {msg.1} ms. If the connection isn't
/// ready yet, retry later.
impl Handler<MsgRequest> for Client {
    type Result = ();
    fn handle(&mut self, msg: MsgRequest, ctx: &mut Context<Self>) -> Self::Result {
        match &mut self.writer {
            Some(w) => {
                w.write_all(&msg.0.to_le_bytes()).expect("write");
                let h = ctx.notify_later(MsgTimeout(msg.0), Duration::from_millis(msg.1));
                self.pending.insert(msg.0, h);
                println!("{} {:?} sent {:?} {:?}", now(), msg, h, self.pending);
            },
            None => {
                let d = Duration::from_millis(10 + msg.0);
                println!("{} {:?} delayed {:?}", now(), msg, d);
                ctx.notify_later(msg, d);
            },
        }
    }
}
/// Got reply from server. Check that it's an expected reply and cancel the timeout.
impl StreamHandler<MsgTcpIn, std::io::Error> for Client {
    fn handle(&mut self, pkt: MsgTcpIn, ctx: &mut Context<Client>) {
        println!("{} client {:?} {:?}", now(), pkt, self.pending);
        match self.pending.remove(&pkt.0) {
            Some(h) => {
                println!("{} canceling {:?}", now(), h);
                ctx.cancel_future(h);
            },
            None => system_stop(1, "unexpected MsgTcpIn"),
        }
    }
}
/// Got a reply timeout, which shouldn't happen in these test conditions (network echo server
/// running on localhost). Guess `ctx.cancel_future()` didn't do its job properly.
impl Handler<MsgTimeout> for Client {
    type Result = ();
    fn handle(&mut self, msg: MsgTimeout, _ctx: &mut Context<Self>) -> Self::Result {
        system_stop(1, format!("{:?} unexpected {:?}", msg, self.pending));
    }
}

/// Keep test duration in check.
struct Watchdog {
    start: Instant,
}
impl Watchdog {
    fn new() -> Self {
        Watchdog { start: Instant::now() }
    }
}
impl Actor for Watchdog {
    type Context = Context<Self>;
    /// Timeout the whole process after 2s, long after the MsgTimeout should have triggered.
    fn started(&mut self, ctx: &mut Context<Self>) {
        ctx.notify_later(MsgShutdown, Duration::from_secs(2));
    }
    /// Connecting to a server and receiving 5 replies should take no more than ~50ms
    fn stopped(&mut self, _ctx: &mut Context<Self>) {
        if self.start.elapsed() < Duration::from_millis(100) {
            system_stop(0, "normal")
        } else {
            system_stop(1, format!("took {:?}", self.start.elapsed()))
        }
    }
}
impl Handler<MsgShutdown> for Watchdog {
    type Result = ();
    fn handle(&mut self, _msg: MsgShutdown, ctx: &mut Context<Self>) -> Self::Result {
        ctx.stop();
    }
}

fn main() {
    System::run(|| {
        let wd = Watchdog::start_in_arbiter(&Arbiter::new(), |_ctx| Watchdog::new());
        let cl = Client::start_in_arbiter(&Arbiter::new(), |_ctx| Client::new(wd));
        for _ in 0..5 {
            cl.do_send(MsgRequest::new());
        }
    }).expect("should exit cleanly");
    // Give some time to print the last output.
    sleep(Duration::from_millis(20));
}
alexlapa commented 3 years ago

@vincentdephily ,

Might be fixed in #484