rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.34k stars 616 forks source link

mpsc::channel missing event from different thread's sender #2747

Closed scw00 closed 1 year ago

scw00 commented 1 year ago

I found mpsc::channel can miss some event when I sending data from different thread.

scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 regist waker done Waker { data: 0x60000184c700, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(1)
scw00 missing waker ThreadId(6)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(1)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(6)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(8)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(6)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(6)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(6)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(8)
scw00 missing waker ThreadId(8)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(8)
scw00 missing waker ThreadId(8)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(8)
scw00 regist waker done Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(2)
scw00 take waker Waker { data: 0x60000184c690, vtable: 0x1028a3c38 } ThreadId(6)
scw00 missing waker ThreadId(6)
scw00 missing waker ThreadId(8)
scw00 missing waker ThreadId(8)
scw00 missing waker ThreadId(8)
scw00 missing waker ThreadId(8)
scw00 missing waker ThreadId(4)
impl<T> Stream for UnboundedReceiver<T> {
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
        // Try to read a message off of the message queue.
        match self.next_message() {
            Poll::Ready(msg) => {
                if msg.is_none() {
                    self.inner = None;
                }
                Poll::Ready(msg)
            }
            Poll::Pending => {
                println!(
                    "scw00 regist waker done {:?} {:?}",
                    cx.waker(),
                    std::thread::current().id()
                );
                // There are no messages to read, in this case, park.
                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
                // Check queue again after parking to prevent race condition:
                // a message could be added to the queue after previous `next_message`
                // before `register` call.
                self.next_message()
            }
        }
    }

    // Push message to the queue and signal to the receiver
    fn queue_push_and_signal(&self, msg: T) {
        // Push the message onto the message queue
        self.inner.message_queue.push(msg);

        // Signal to the receiver that a message has been enqueued. If the
        // receiver is parked, this will unpark the task.
        if let Some(waker) = self.inner.recv_task.take() {
            println!(
                "scw00 take waker {:?} {:?}",
                waker,
                std::thread::current().id()
            );
            waker.wake()
        } else {
            println!("scw00 missing waker {:?}", std::thread::current().id())
        }
    }