rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.38k stars 624 forks source link

Fix flake in buffer_unordered #1790

Open cramertj opened 5 years ago

cramertj commented 5 years ago

seen on this change: https://github.com/rust-lang-nursery/futures-rs/pull/1789 in this job: https://travis-ci.com/rust-lang-nursery/futures-rs/jobs/224053387

taiki-e commented 5 years ago

I tested it locally but could not reproduce it (Windows and Linux(WSL-Ubuntu)). As far as I look at history (pr and master), this problem seems not to occur in Linux, so I think this is an issue specific to macOS or a Travis issue.

Recent Travis's macOS build is very unstable regardless of this test (https://github.com/rust-lang-nursery/futures-rs/pull/1787#issuecomment-519628394), so for now, I'm guessing this is a Travis issue.

taiki-e commented 4 years ago

(The OS looks irrelevant, but I have never been able to reproduce it locally.)

kentfredric commented 4 years ago

I've been reproducing this on linux in futures 0.1.29, the problem is its very ephemeral and prone to randomly working 99% of the time.

I can't make it do this on nightly rust, but I can do it on current stable rust.

Attaching gdb once it deadlocks gives me:

gdb thread apply all bt full ``` Thread 4 (Thread 0x7f9d6d721700 (LWP 9458)): #0 futex_wait_cancelable (private=0, expected=0, futex_word=0x7f9d64000cac) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 __ret = -512 oldtype = 0 err = oldtype = err = __ret = resultvar = __arg4 = __arg3 = __arg2 = __arg1 = _a4 = _a3 = _a2 = _a1 = #1 __pthread_cond_wait_common (abstime=0x0, mutex=0x7f9d64000b20, cond=0x7f9d64000c80) at pthread_cond_wait.c:502 spin = 0 buffer = {__routine = 0x7f9d6de63b80 <__condvar_cleanup_waiting>, __arg = 0x7f9d6d720a10, __canceltype = 1836190128, __prev = 0x0} cbuffer = {wseq = 11, cond = 0x7f9d64000c80, mutex = 0x7f9d64000b20, private = 0} rt = err = g = 1 flags = g1_start = signals = result = 0 wseq = 11 seq = 5 private = 0 maxspin = err = result = wseq = g = seq = flags = private = signals = g1_start = spin = buffer = cbuffer = rt = s = #2 __pthread_cond_wait (cond=0x7f9d64000c80, mutex=0x7f9d64000b20) at pthread_cond_wait.c:655 No locals. #3 0x0000561614377c8d in futures::task_impl::std::ThreadNotify::park () No symbol table info available. #4 0x000056161432c36a in std::sys_common::backtrace::__rust_begin_short_backtrace () No symbol table info available. #5 0x000056161432e1bc in std::panicking::try::do_call () No symbol table info available. #6 0x0000561614388a4a in __rust_maybe_catch_panic () at src/libpanic_unwind/lib.rs:80 No locals. #7 0x000056161433a55f in core::ops::function::FnOnce::call_once{{vtable-shim}} () No symbol table info available. #8 0x000056161437bcdf in as core::ops::function::FnOnce>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/liballoc/boxed.rs:922 No locals. #9 0x0000561614388110 in as core::ops::function::FnOnce>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/liballoc/boxed.rs:922 No locals. #10 std::sys_common::thread::start_thread () at src/libstd/sys_common/thread.rs:13 No locals. #11 std::sys::unix::thread::Thread::new::thread_start () at src/libstd/sys/unix/thread.rs:79 No locals. #12 0x00007f9d6de5d3a7 in start_thread (arg=) at pthread_create.c:486 ret = pd = now = unwind_buf = {cancel_jmp_buf = {{jmp_buf = {140314122786560, -2814752794425973897, 140314126984062, 140314126984063, 140314122784448, 140314126984320, 2870244160037954423, 2870243169132979063}, mask_was_saved = 0}}, priv = {pad = {0x0, 0x0, 0x0, 0x0}, data = {prev = 0x0, cleanup = 0x0, canceltype = 0}}} not_first_call = #13 0x00007f9d6dd720ff in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 No locals. Thread 3 (Thread 0x7f9d6d922700 (LWP 9457)): #0 futex_wait_cancelable (private=0, expected=0, futex_word=0x7f9d60000cac) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 __ret = -512 oldtype = 0 err = oldtype = err = __ret = resultvar = __arg4 = __arg3 = __arg2 = __arg1 = _a4 = _a3 = _a2 = _a1 = #1 __pthread_cond_wait_common (abstime=0x0, mutex=0x7f9d60000b20, cond=0x7f9d60000c80) at pthread_cond_wait.c:502 spin = 0 buffer = {__routine = 0x7f9d6de63b80 <__condvar_cleanup_waiting>, __arg = 0x7f9d6d921890, __canceltype = 339515576, __prev = 0x0} cbuffer = {wseq = 7, cond = 0x7f9d60000c80, mutex = 0x7f9d60000b20, private = 0} rt = err = g = 1 flags = g1_start = signals = result = 0 wseq = 7 seq = 3 private = 0 maxspin = err = result = wseq = g = seq = flags = private = signals = g1_start = spin = buffer = cbuffer = rt = s = #2 __pthread_cond_wait (cond=0x7f9d60000c80, mutex=0x7f9d60000b20) at pthread_cond_wait.c:655 No locals. #3 0x0000561614377c8d in futures::task_impl::std::ThreadNotify::park () No symbol table info available. #4 0x000056161432d263 in std::thread::local::LocalKey::with () No symbol table info available. #5 0x000056161432ea05 in futures::future::Future::wait () No symbol table info available. #6 0x000056161432cc95 in std::sys_common::backtrace::__rust_begin_short_backtrace () No symbol table info available. #7 0x000056161432e1fe in std::panicking::try::do_call () No symbol table info available. #8 0x0000561614388a4a in __rust_maybe_catch_panic () at src/libpanic_unwind/lib.rs:80 No locals. #9 0x000056161433a733 in core::ops::function::FnOnce::call_once{{vtable-shim}} () No symbol table info available. #10 0x000056161437bcdf in as core::ops::function::FnOnce>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/liballoc/boxed.rs:922 No locals. #11 0x0000561614388110 in as core::ops::function::FnOnce>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/liballoc/boxed.rs:922 No locals. #12 std::sys_common::thread::start_thread () at src/libstd/sys_common/thread.rs:13 No locals. #13 std::sys::unix::thread::Thread::new::thread_start () at src/libstd/sys/unix/thread.rs:79 No locals. #14 0x00007f9d6de5d3a7 in start_thread (arg=) at pthread_create.c:486 ret = pd = now = unwind_buf = {cancel_jmp_buf = {{jmp_buf = {140314124887808, -2814752794425973897, 140314126984014, 140314126984015, 140314124885696, 140314126984272, 2870242235355734903, 2870243169132979063}, mask_was_saved = 0}}, priv = {pad = {0x0, 0x0, 0x0, 0x0}, data = {prev = 0x0, cleanup = 0x0, canceltype = 0}}} not_first_call = #15 0x00007f9d6dd720ff in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 No locals. Thread 2 (Thread 0x7f9d6db23700 (LWP 9456)): #0 futex_wait_cancelable (private=0, expected=0, futex_word=0x561615133cd8) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 __ret = -512 oldtype = 0 err = oldtype = err = __ret = resultvar = __arg4 = __arg3 = __arg2 = __arg1 = _a4 = _a3 = _a2 = _a1 = #1 __pthread_cond_wait_common (abstime=0x0, mutex=0x561615132c30, cond=0x561615133cb0) at pthread_cond_wait.c:502 spin = 0 buffer = {__routine = 0x7f9d6de63b80 <__condvar_cleanup_waiting>, __arg = 0x7f9d6db22340, __canceltype = 0, __prev = 0x0} cbuffer = {wseq = 16, cond = 0x561615133cb0, mutex = 0x561615132c30, private = 0} rt = err = g = 0 flags = g1_start = signals = result = 0 wseq = 16 seq = 8 private = 0 maxspin = err = result = wseq = g = seq = flags = private = signals = g1_start = spin = buffer = cbuffer = rt = s = #2 __pthread_cond_wait (cond=0x561615133cb0, mutex=0x561615132c30) at pthread_cond_wait.c:655 No locals. #3 0x000056161437c793 in std::sys::unix::condvar::Condvar::wait () at src/libstd/sys/unix/condvar.rs:71 No locals. #4 std::sys_common::condvar::Condvar::wait () at src/libstd/sys_common/condvar.rs:41 No locals. #5 std::sync::condvar::Condvar::wait () at src/libstd/sync/condvar.rs:204 No locals. #6 std::thread::park () at src/libstd/thread/mod.rs:911 No locals. #7 0x00005616143828d2 in std::sync::mpsc::blocking::WaitToken::wait () at src/libstd/sync/mpsc/blocking.rs:71 No locals. #8 0x00005616143249b1 in std::sync::mpsc::stream::Packet::recv () No symbol table info available. #9 0x0000561614333028 in std::sync::mpsc::Receiver::recv () No symbol table info available. #10 0x0000561614328607 in core::ops::function::FnOnce::call_once () No symbol table info available. #11 0x000056161434637f in as core::ops::function::FnOnce>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/liballoc/boxed.rs:922 No locals. #12 0x0000561614388a4a in __rust_maybe_catch_panic () at src/libpanic_unwind/lib.rs:80 No locals. #13 0x0000561614360b8e in std::panicking::try () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/panicking.rs:271 No locals. #14 std::panic::catch_unwind () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/panic.rs:394 No locals. #15 test::run_test::run_test_inner::{{closure}} () at src/libtest/lib.rs:1413 No locals. #16 0x000056161433bbf5 in std::sys_common::backtrace::__rust_begin_short_backtrace () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/sys_common/backtrace.rs:126 No locals. #17 0x000056161433fdf5 in std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/mod.rs:470 No locals. #18 as core::ops::function::FnOnce<()>>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/panic.rs:315 No locals. #19 std::panicking::try::do_call () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/panicking.rs:292 No locals. #20 0x0000561614388a4a in __rust_maybe_catch_panic () at src/libpanic_unwind/lib.rs:80 No locals. #21 0x0000561614340362 in std::panicking::try () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/panicking.rs:271 No locals. #22 std::panic::catch_unwind () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/panic.rs:394 No locals. #23 std::thread::Builder::spawn_unchecked::{{closure}} () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/thread/mod.rs:469 No locals. #24 core::ops::function::FnOnce::call_once{{vtable-shim}} () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libcore/ops/function.rs:227 No locals. #25 0x000056161437bcdf in as core::ops::function::FnOnce>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/liballoc/boxed.rs:922 No locals. #26 0x0000561614388110 in as core::ops::function::FnOnce>::call_once () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/liballoc/boxed.rs:922 No locals. #27 std::sys_common::thread::start_thread () at src/libstd/sys_common/thread.rs:13 No locals. #28 std::sys::unix::thread::Thread::new::thread_start () at src/libstd/sys/unix/thread.rs:79 No locals. #29 0x00007f9d6de5d3a7 in start_thread (arg=) at pthread_create.c:486 ret = pd = now = unwind_buf = {cancel_jmp_buf = {{jmp_buf = {140314126989056, -2814752794425973897, 140734155202382, 140734155202383, 140314126986944, 140734155202640, 2870242511844254583, 2870243169132979063}, mask_was_saved = 0}}, priv = {pad = {0x0, 0x0, 0x0, 0x0}, data = {prev = 0x0, cleanup = 0x0, canceltype = 0}}} not_first_call = #30 0x00007f9d6dd720ff in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 No locals. Thread 1 (Thread 0x7f9d6db24840 (LWP 9455)): #0 futex_reltimed_wait_cancelable (private=0, reltime=0x7fff39542410, expected=0, futex_word=0x561615132a98) at ../sysdeps/unix/sysv/linux/futex-internal.h:142 __ret = -516 oldtype = 0 err = oldtype = err = __ret = resultvar = __arg4 = __arg3 = __arg2 = __arg1 = _a4 = _a3 = _a2 = _a1 = #1 __pthread_cond_wait_common (abstime=0x7fff395424e8, mutex=0x561615132a20, cond=0x561615132a70) at pthread_cond_wait.c:533 rt = {tv_sec = 59, tv_nsec = 999910039} spin = 0 buffer = {__routine = 0x7f9d6de63b80 <__condvar_cleanup_waiting>, __arg = 0x7fff39542440, __canceltype = 1844400740, __prev = 0x0} cbuffer = {wseq = 0, cond = 0x561615132a70, mutex = 0x561615132a20, private = 0} err = g = 0 flags = 2 g1_start = maxspin = 0 signals = result = 0 wseq = 0 seq = 0 private = 0 maxspin = err = result = wseq = g = seq = flags = private = signals = g1_start = spin = buffer = cbuffer = rt = s = #2 __pthread_cond_timedwait (cond=0x561615132a70, mutex=0x561615132a20, abstime=0x7fff395424e8) at pthread_cond_wait.c:667 No locals. #3 0x0000561614387061 in std::sys::unix::condvar::Condvar::wait_timeout () at src/libstd/sys/unix/condvar.rs:102 No locals. #4 0x000056161437cb52 in std::sys_common::condvar::Condvar::wait_timeout () at src/libstd/sys_common/condvar.rs:51 No locals. #5 std::sync::condvar::Condvar::wait_timeout () at src/libstd/sync/condvar.rs:405 No locals. #6 std::thread::park_timeout () at src/libstd/thread/mod.rs:1006 No locals. #7 0x00005616143829fd in std::sync::mpsc::blocking::WaitToken::wait_max_until () at src/libstd/sync/mpsc/blocking.rs:82 No locals. #8 0x000056161433e56e in std::sync::mpsc::shared::Packet::recv () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/sync/mpsc/shared.rs:229 No locals. #9 0x0000561614358d78 in std::sync::mpsc::Receiver::recv_deadline () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/sync/mpsc/mod.rs:1403 No locals. #10 std::sync::mpsc::Receiver::recv_timeout () at /rustc/4560ea788cb760f0a34127156c78e2552949f734/src/libstd/sync/mpsc/mod.rs:1313 No locals. #11 test::run_tests () at src/libtest/lib.rs:1125 No locals. #12 test::run_tests_console () at src/libtest/lib.rs:957 No locals. #13 0x000056161434f992 in test::test_main () at src/libtest/lib.rs:295 No locals. #14 0x00005616143501ad in test::test_main_static () at src/libtest/lib.rs:329 No locals. #15 0x000056161432eb13 in std::rt::lang_start::{{closure}} () No symbol table info available. #16 0x0000561614385cb3 in std::rt::lang_start_internal::{{closure}} () at src/libstd/rt.rs:49 No locals. #17 std::panicking::try::do_call () at src/libstd/panicking.rs:292 No locals. #18 0x0000561614388a4a in __rust_maybe_catch_panic () at src/libpanic_unwind/lib.rs:80 No locals. #19 0x000056161438677d in std::panicking::try () at src/libstd/panicking.rs:271 No locals. #20 std::panic::catch_unwind () at src/libstd/panic.rs:394 No locals. #21 std::rt::lang_start_internal () at src/libstd/rt.rs:48 No locals. #22 0x0000561614329ad2 in main () No symbol table info available. ```

Hopefully something in that mess is relevant.

tmiasko commented 4 years ago

This issue happens to be easily reproducible on my system. The deadlocked situation is as follows:

The T3 should be able to proceed successfully, since channel has capacity 1 and additional 1 for the sender. Unfortunatelly the implementation of per sender capacity is racy and it is possible for a sender to be incorrectly parked even though free capacity is there. If the receiver is being polled from, the situation eventually corrects itself. But in the case here rx is not being polled from so the problem persist.

Execution that leads to a mpsc bounded sender being incorrectly parked:

The receiver is woken up afterwards, but it isn't being polled from so that doesn't help.