zesterer / flume

A safe and fast multi-producer, multi-consumer channel.
https://crates.io/crates/flume
Apache License 2.0
2.43k stars 84 forks source link

Clarify behavior rendezvous channels #145

Closed orlp closed 6 months ago

orlp commented 6 months ago

For my purpose it is important that the sender does not wake up before the receiver received its value in a rendezvous channel.

It does appear (reading from the code) that flume actually does this, only returning Poll::Ready once a value has physically been received. However, the docs do not guarantee this (emphasis mine):

Like std::sync::mpsc, flume supports ‘rendezvous’ channels. A bounded queue with a maximum capacity of zero will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a ‘Glienicke Bridge’-style location at which senders and receivers perform a handshake and transfer ownership of a value.

Technically in this scenario (where send, recv are from flume::bounded(0)):

// Task 1.
recv.recv_async().await;
println!("hey");

// Task 2.
send.send_async(()).await;
println!("ho");

it is allowed to print ho, hey instead of hey, ho on a single-threaded executor since by the time send.send_async(()).await occurs the waiter could be "available" and thus the sender immediately continues.

Could the guarantee be given that this never happens? That is, instead of this:

block senders until a receiver is available to take the value

flume would guarantee this:

block senders until a receiver has taken the value

zesterer commented 6 months ago

For my purpose it is important that the sender does not wake up before the receiver received its value in a rendezvous channel.

What do you mean by 'wake up'? In the extreme case, pessimistic scheduler behaviour could certainly mean that the sender could start running before the receiver starts running, and there's pretty much nothing you can do to stop that.

it is allowed to print ho, hey instead of hey, ho

That's entirely governed by the behaviour of the async runtime. Theoretically, flume won't wake up the sender until it knows that a receiver is currently ready and waiting to receive a message (note that this is not the same thing as the receiver having woken up!), but there's no telling what order the async runtime's scheduler will actually decide to wake the tasks up.

I strongly recommend not building your system around very strict assumptions like this. If you want to guarantee ordering, use the synchronisation primitives available to you - like mutexes, signals, etc. - so that you're not relying on changeable behaviour of the runtime.

orlp commented 6 months ago

What do you mean by 'wake up'?

Unblocking is probably the better word.

I mean that send.send_async(()).await does not unblock until the value it tried to send has been received, that is that an associated recv.recv_async().await has successfully unblocked.

it is allowed to print ho, hey instead of hey, ho

That's entirely governed by the behaviour of the async runtime.

In a single-threaded executor that is literally impossible, how it's implemented right now.

In the extreme case, pessimistic scheduler behaviour could certainly mean that the sender could start running before the receiver starts running, and there's pretty much nothing you can do to stop that.

You absolutely could, and flume already does. It returns Poll::Pending unless the value has been picked up by a receiver. At least if I read the poll function correctly.

use the synchronisation primitives available to you

I would consider a rendezvous channel a synchronization primitive.

so that you're not relying on changeable behaviour of the runtime

What I'm describing is runtime-agnostic.

zesterer commented 6 months ago

Unblocking is probably the better word.

I think you're getting confused here. There are two distinct concepts here that are not the same. An async task being placed into a ready state first does not necessarily imply that it will be the first task to be run when the scheduler next gets to make such a decision.

Consider the following timeline:

1) Task 1 calls recv(). The task is switched from a ready state to a waiting state.

2) Task 1 .awaits the future returned by recv(). Doing so causes the task to yield to the runtime, at which point the runtime's scheduler decides to run Task 2 (since it is the only one of the two tasks currently in a ready state).

3) Task 2 calls send(()). Because there is a receiver (Task 1) already waiting, it does not need to switch itself into a waiting state and so it simply places the message on the queue. Placing the message on the queue causes Task 1 to be switched to a ready state.

4) Task 2 .awaits the future returned by send(()). Await still causes the task to yield to the runtime (yes, even if the future is immediately ready: see here for documentation demonstrating that Tokio does this).

5) The runtime needs to make a decision about which task to run next. Because both Task 1 and Task 2 are both in a ready state, the choice as to which gets run first is arbitrary.

Note that all of this applies for a single-threaded async runtime too.

You absolutely could, and flume already does.

As demonstrated, it does not. What it does guarantee is that for rendezvous queues (i.e: those with a bound of 0), a call to .send will place the task into a waiting state unless there is already a receiver already waiting to receive an item from the queue.

What I'm describing is runtime-agnostic.

It is not. One could feasibly imagine reasonable scheduler algorithms that result in either Task 1 or Task 2 being run first after the send occurs (again, this also applies for single-threaded runtimes too).

I would consider a rendezvous channel a synchronization primitive.

A rendezvous channel is not a synchronisation primitive in the sense that it does not guarantee a canonical post-rendezvous ordering for sender and receiver. Why? Because to guarantee such an ordering, you need a critical section: and there is no critical section between the two. The instant the send(()) call occurs, both tasks immediately lose whatever relationship with one-another they may have had, and that includes order of execution.

orlp commented 6 months ago

I understand the confusion now, I misread the code. You are right that flume::Sender immediately returns Poll::Ready if there is a reader waiting.

Why? Because to guarantee such an ordering, you need a critical section: and there is no critical section between the two. The instant the send(()) call occurs, both tasks immediately lose whatever relationship with one-another they may have had, and that includes order of execution.

I assumed an implementation where the sender is blocked until the receiver unblocks it. An implementation I wrote does have this feature, and I need it so I guess I can't use flume's bounded(0) channel. I thought I recognized a similar pattern in flume's code but I was wrong.

orlp commented 6 months ago

For the record, this does mean that flume's bounded(0) is effectively a strange 1-capacity channel. It can hold ownership over data indefinitely long, and it can even 'lose' data in that Drop on the data gets called when the channel dies while it still holds ownership.

Even though it is a rendezvous channel where the sender returns Err(T) "if all receivers have been dropped", values can get stuck in the channel if a receiver gets dropped between a send and a receive. I would expect a 'true' rendezvous channel to provide atomic ownership transfer from sender to receiver, but this is not what happens.

Consider this example:

#![feature(noop_waker)]

use std::pin::Pin;
use std::task::{Waker, Context};
use std::future::Future;

#[derive(Debug)]
struct MayNeverDrop;
impl Drop for MayNeverDrop {
    fn drop(&mut self) {
        panic!("dropped");
    }
}

async fn produces_neverdrops(s: flume::Sender<MayNeverDrop>) {
    loop {
        let p = MayNeverDrop;
        println!("produced");
        match s.send_async(p).await {
            Ok(()) => continue,
            Err(x) => {
                core::mem::forget(x);
                break;
            }
        }
    }
}

async fn consumes_neverdrops(r: flume::Receiver<MayNeverDrop>) {
    while let Ok(x) = r.recv_async().await {
        println!("consumed");
        core::mem::forget(x);
    }
}

fn main() {
    let (send, recv) = flume::bounded(0);
    let mut producer = produces_neverdrops(send).boxed();
    let mut consumer = consumes_neverdrops(recv).boxed();

    let dummy_waker = Waker::noop();
    let mut cx = Context::from_waker(&dummy_waker);

    dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    println!("cancelling consumer");
    drop(consumer);
    println!("simulate join() on producer");
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    println!("leaving scope")
}

One might expect that as long as you join() on the producer then you may cancel the consumes_neverdrops at any time, and the producer will clean up by taking back the last value from the Err and forgetting it. Furthermore you might expect the log to say

produced
consumed
produced
consumed

since our channel has zero capacity.

Instead we see the following:

[src/main.rs:44:5] Future::poll(Pin::new(&mut consumer), &mut cx) = Pending
produced
produced
[src/main.rs:45:5] Future::poll(Pin::new(&mut producer), &mut cx) = Pending
consumed
consumed
[src/main.rs:46:5] Future::poll(Pin::new(&mut consumer), &mut cx) = Pending
produced
produced
[src/main.rs:47:5] Future::poll(Pin::new(&mut producer), &mut cx) = Pending
consumed
consumed
[src/main.rs:48:5] Future::poll(Pin::new(&mut consumer), &mut cx) = Pending
produced
produced
[src/main.rs:49:5] Future::poll(Pin::new(&mut producer), &mut cx) = Pending
cancelling consumer
simulate join() on producer
thread 'main' panicked at src/main.rs:11:9:
dropped

One might wonder how we managed to pass TWO values per poll through a zero-capacity channel. That really looks like a 1-capacity channel to me.

orlp commented 6 months ago

To further support the idea that the current design of bounded(0) isn't a true bounded(0) channel but a strange bounded(1) channel, consider this example:

async fn producer(s: flume::Sender<()>) {
    loop {
        println!("produced");
        s.send_async(()).await.ok();
    }
}

async fn consumer(r: flume::Receiver<()>) {
    loop {
        r.recv_async().await.ok();
        println!("consumed");
    }
}

fn main() {
    for capacity in [3, 2, 1, 0] {
        let (send, recv) = flume::bounded(capacity);
        let mut producer = producer(send).boxed();
        let mut consumer = consumer(recv).boxed();

        let dummy_waker = Waker::noop();
        let mut cx = Context::from_waker(&dummy_waker);

        println!("capacity: {capacity}");
        let _ = Future::poll(Pin::new(&mut producer), &mut cx);
        let _ = Future::poll(Pin::new(&mut consumer), &mut cx);
        let _ = Future::poll(Pin::new(&mut producer), &mut cx);
        let _ = Future::poll(Pin::new(&mut consumer), &mut cx);
        println!("");
    }
}

This prints:

capacity: 3
produced
produced
produced
produced
consumed
consumed
consumed
consumed
produced
produced
produced
produced
consumed
consumed
consumed
consumed

capacity: 2
produced
produced
produced
consumed
consumed
consumed
produced
produced
produced
consumed
consumed
consumed

capacity: 1
produced
produced
consumed
consumed
produced
produced
consumed
consumed

capacity: 0
produced
consumed
produced
produced
consumed
consumed

As you would expect, for bounded(n) we produce n + 1 values then block because the channel is full on the n + 1th value. Then the receiver accepts n + 1 values: n until the channel is empty and the last value is taken directly from the sender until we have to block until more values are available.

This pattern holds for all n, except 0 where we mysteriously see that we start producing and consuming two at a time, passing more than one value through the channel at a time even though the channel is supposed to have 0 capacity.

zesterer commented 6 months ago

values can get stuck in the channel if a receiver gets dropped between a send and a receive

This is actually a broader issue in the async ecosystem, that of future cancellation. It's a very difficult problem to engineer around (even more so in the case of streams), and is the main thing that makes having flume support both sync and async in the same implementation quite difficult. The sync implementations don't need to deal with this since you can't remotely cancel a thread (or, at least, it's reasonable to expect lost values if this is done).

To further support the idea that the current design of bounded(0) isn't a true bounded(0) channel but a strange bounded(1) channel, consider this example

I don't think that this example demonstrates what you think it's demonstrating.

It is entirely possible for both produced and consumed to be printed twice in a row due to the scheduling issue (not really an issue) that I mentioned previously.

As I said, send(()).await on a rendezvous channel does not guarantee that the sending task will not do work until the receiving task has started doing work, nor should it: the only guarantee is that the sending task will not do work until there is a receiving task waiting to receive the message.

If you've got the impression that this is a guarantee made by flume, then I apologise: that's not what it's designed to do, and this isn't a bug.

If you want that sort of behaviour, then the thing to do is sending some sort of sync primitive to the receiving task. For example, the sending might send a oneshot::Sender<()> to the receiving task, such that the receiving task can signal to the sending task when the value has been processed, allowing the sending task to wait until the right time to continue doing work.

orlp commented 6 months ago

As I said, send(()).await on a rendezvous channel does not guarantee that the sending task will not do work until the receiving task has started doing work, nor should it: the only guarantee is that the sending task will not do work until there is a receiving task waiting to receive the message.

My initial opening of the issue was based on a misunderstanding of the current behavior, but I've since come to feel more strongly and that my expected behavior is actually the correct behavior of a zero-capacity channel. A zero-capacity channel should never be able to end up in a position where it owns data, in my opinion, as it models something with no capacity whatsoever.

However if your interpretation is that it's not incorrect and thus not a bug we can close this.

zesterer commented 6 months ago

I don't see this as the channel 'owning' data: just that the receiving task is being given ownership of data by the sending task, but is not yet in a state to process that change in ownership. It's an interesting and subtle distinction though, so might be worth further documentation.

ibraheemdev commented 6 months ago

std::sync::mpsc and crossbeam also implement the same type of rendezvous channel. A successful send only guarantees that recv has been called, not completed. The standard documentation is also a bit misleading about this distinction:

If the buffer size is 0, however, the channel becomes a rendezvous channel and it guarantees that the receiver has indeed received the data if this function returns success.

crossbeam is a bit clearer:

If called on a zero-capacity channel, this method will wait for a receive operation to appear on the other side of the channel.