conradludgate / futures-buffered

futures-buffered
MIT License
36 stars 5 forks source link

Intermittent hangs #5

Open v1gnesh opened 3 months ago

v1gnesh commented 3 months ago

Firstly, thank you for sharing your work :heart:

Inside a tokio async main, I'm using it like this:

use futures_buffered::BufferedStreamExt;

let mut stream = framed_read_stream
    .map(|x| do_stuff(x))
    .buffered_ordered(4096);

while let Some(recvd) = stream.next().await {
}

It just seems to hang, like 80% of the time (i.e., 80% of the time I run the program). How do I debug this?

conradludgate commented 3 months ago

Curious. Do you happen to know what operation "do_stuff" gets stuck on specifically?

conradludgate commented 3 months ago

Although it shouldn't matter here, there is a bug with BufferedOrdered I've just found where the size_hint is incorrec: https://github.com/conradludgate/futures-buffered/blob/2384ae2170edfa32a0bf4b2ba116bcfce39dbae7/src/buffered/ordered.rs#L77

v1gnesh commented 3 months ago

do_stuff creates a tx rx pair, spawns tasks into a pre-created rayon global thread pool giving it the tx, then returns the rx. So stream is a Stream of Receivers.

conradludgate commented 3 months ago

I think I found a subtle race condition in the waker implementation. Just published a potential fix

v1gnesh commented 3 months ago

Still stalls :( Works with buffered(1) but you might expect that to work already.

conradludgate commented 3 months ago

Hmm. Unfortunate. I guess I'll keep digging. I've not been able to reproduce any stalls myself yet