jeikabu / runng

MIT License
25 stars 3 forks source link

Canceling an async receive drops messages #65

Open Andrepuel opened 4 years ago

Andrepuel commented 4 years ago

From the source code:

struct WorkQueue {
    waiting: VecDeque<oneshot::Sender<Result<NngMsg>>>,
    ready: VecDeque<Result<NngMsg>>,
}

impl WorkQueue {
    fn push_back(&mut self, message: Result<NngMsg>) {
        if let Some(sender) = self.waiting.pop_front() {
            sender
                .send(message)
                .unwrap_or_else(|err| debug!("Dropping message: {:?}", err));
        } else {
            self.ready.push_back(message);
        }
    }

    fn pop_front(&mut self) -> AsyncMsg {
        // If a value is ready return it immediately.  Otherwise
        if let Some(item) = self.ready.pop_front() {
            Box::pin(future::ready(item))
        } else {
            let (sender, receiver) = oneshot::channel();
            self.waiting.push_back(sender);
            let receiver = receiver.map(result::flatten_result);
            Box::pin(receiver)
        }
    }
}

When you read from a socket. The pop_front function is called. Since there is no message ready, an oneshot channel is registered. If the read is cancelled (AsyncMsg is dropped), the oneshot stays registered. Eventually, when a message actually is received, it will be sent to that invalid channel. The send will fail and the message will be dropped.

A simple workaround would be to put the body of the push_back within a loop statement. Trying to send the message until a valid channel is reached or until it is placed into the ready queue.

Cancelling a read is useful when you use the select! macro.

Andrepuel commented 4 years ago

The following workaround worked for me:

impl WorkQueue {
    fn push_back(&mut self, message: Result<NngMsg>) {
        let mut message = Some(message);
        loop {
            if let Some(sender) = self.waiting.pop_front() {
                if let Err(unsent_message) = sender
                    .send(message.take().unwrap())
                {
                    message = Some(unsent_message);
                    continue;
                }
            } else {
                self.ready.push_back(message.take().unwrap());
            }

            break;
        }
    }

    fn pop_front(&mut self) -> AsyncMsg {
        // If a value is ready return it immediately.  Otherwise
        if let Some(item) = self.ready.pop_front() {
            Box::pin(future::ready(item))
        } else {
            let (sender, receiver) = oneshot::channel();
            self.waiting.push_back(sender);
            let receiver = receiver.map(result::flatten_result);
            Box::pin(receiver)
        }
    }
}
jeikabu commented 4 years ago

Seems reasonable.

If you could add a test verifying the correct behavior I'd gladly accept a PR.