tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
27.05k stars 2.49k forks source link

tokio::test mpsc block error #3494

Closed luyikk closed 3 years ago

luyikk commented 3 years ago

Version List the versions of all tokio crates you are using. The easiest way to get this information is using cargo tree subcommand:

cargo tree | grep tokio

tokio v1.1.1
    ├── bytes v1.0.1
    ├── memchr v2.3.4
    ├── mio v0.7.7
    │   ├── log v0.4.14
    │   │   └── cfg-if v1.0.0
    │   ├── miow v0.3.6
    │   │   ├── socket2 v0.3.19
    │   │   │   └── winapi v0.3.9
    │   │   └── winapi v0.3.9
    │   ├── ntapi v0.3.6
    │   │   └── winapi v0.3.9
    │   └── winapi v0.3.9
    ├── num_cpus v1.13.0
    │   └── libc v0.2.84
    ├── once_cell v1.5.2
    ├── parking_lot v0.11.1
    │   ├── instant v0.1.9
    │   │   └── cfg-if v1.0.0
    │   ├── lock_api v0.4.2
    │   │   └── scopeguard v1.1.0
    │   └── parking_lot_core v0.8.2
    │       ├── cfg-if v1.0.0
    │       ├── instant v0.1.9 (*)
    │       ├── smallvec v1.6.1
    │       └── winapi v0.3.9
    ├── pin-project-lite v0.2.4
    ├── tokio-macros v1.0.0 (proc-macro)
    │   ├── proc-macro2 v1.0.24
    │   │   └── unicode-xid v0.2.1
    │   ├── quote v1.0.8
    │   │   └── proc-macro2 v1.0.24 (*)
    │   └── syn v1.0.60
    │       ├── proc-macro2 v1.0.24 (*)
    │       ├── quote v1.0.8 (*)
    │       └── unicode-xid v0.2.1
    └── winapi v0.3.9

Platform The output of uname -a (UNIX), or version and 32 or 64-bit (Windows) 64-bit (Windows)

Description Enter your issue details here. One way to structure the description: output: test test2 ... test test2 has been running for over 60 seconds

[short summary of the bug]

I tried this code:

#[tokio::test]
pub async fn test2(){
    let (tx, mut rx) = mpsc::channel(1024);
    let move_tx=tx.clone();
    tokio::spawn(async move {
        move_tx.send(1).await.unwrap();
        move_tx.send(2).await.unwrap();
    });
   // drop(tx);
    assert_eq!(Some(1), rx.recv().await);
    assert_eq!(Some(2), rx.recv().await);
    assert_eq!(None, rx.recv().await);
}
Darksonn commented 3 years ago

With the drop(tx) call commented out, running forever is the behavior I would expect. If you remove the comments, it will complete:

use tokio::sync::mpsc;

#[tokio::test]
pub async fn test2(){
    let (tx, mut rx) = mpsc::channel::<i32>(1024);
    let move_tx=tx.clone();
    tokio::spawn(async move {
        move_tx.send(1).await.unwrap();
        move_tx.send(2).await.unwrap();
    });
    drop(tx);
    assert_eq!(Some(1), rx.recv().await);
    assert_eq!(Some(2), rx.recv().await);
    assert_eq!(None, rx.recv().await);
}
luyikk commented 3 years ago

With the drop(tx) call commented out, running forever is the behavior I would expect. If you remove the comments, it will complete:

use tokio::sync::mpsc;

#[tokio::test]
pub async fn test2(){
    let (tx, mut rx) = mpsc::channel::<i32>(1024);
    let move_tx=tx.clone();
    tokio::spawn(async move {
        move_tx.send(1).await.unwrap();
        move_tx.send(2).await.unwrap();
    });
    drop(tx);
    assert_eq!(Some(1), rx.recv().await);
    assert_eq!(Some(2), rx.recv().await);
    assert_eq!(None, rx.recv().await);
}

This code drop(tx) can be completed, but it's strange that I can't complete the following code:

            let (tx, mut rx) = unbounded_channel();
            for (index, udp_sock) in self.udp_contexts.iter().enumerate() {
                let recv_sock = udp_sock.recv.borrow_mut().take();
                if let Some(recv_sock) = recv_sock {
                    let move_data_tx = tx.clone();            
                    tokio::spawn(async move {
                        let mut buff = [0; BUFF_MAX_SIZE];
                        loop {
                            let res =
                                recv_sock.recv_from(&mut buff).await;

                            if let Ok((size, addr)) = res {
                                if let Err(er) = move_data_tx.send((index, addr, buff[..size].to_vec())) {
                                    .....
                                    break;
                                }
                            } else if let Err(er) = res {
                                ......
                            }
                        }
                    });
                }
            }
            drop(tx);  \\drop tx
            while let Some((index,  addr, data)) = rx.recv().await {
               ......
            } 

never receive any message in test:

  while let Some((index,  addr, data)) = rx.recv().await {
               ......
  }

tokio 0.2 version It works. I tried to upgrade tokio 1.0 and found this problem.

src:https://github.com/luyikk/udp_server/blob/master/src/udp_serv.rs#L296

luyikk commented 3 years ago

When I upgraded from Tokio 0.2 to Tokio 1.0, I found that there were many problems with my UDP server, and it began to stop working, especially MPSC, unbounded_ When the data volume of channel is small, it will stop when it becomes large. I repeatedly switch between Tokio 0.2 and Tokio 1.0 to verify this problem @Darksonn

Darksonn commented 3 years ago

Can you please try changing this:

https://github.com/luyikk/udp_server/blob/e8822f63532e7bf272afe51b17f03ca109ad6af5/src/udp_serv.rs#L186-L190

to make it call UdpSocket::set_nonblocking.

fn create_async_udp_socket<A: ToSocketAddrs>(addr: &A) -> Result<UdpSocket, Box<dyn Error>> {
    let std_sock = Self::create_udp_socket(&addr)?;
    std_sock.set_nonblocking(true)?;
    let sock = UdpSocket::try_from(std_sock)?;
    Ok(sock)
}

The std to Tokio socket conversion methods were changed going from 0.2 to 1.x to require you to manually set the socket to nonblocking mode.

luyikk commented 3 years ago
std_sock.set_nonblocking(true)?;

Thank you very much.