danieldg / ordered-stream

Apache License 2.0
11 stars 1 forks source link

JoinMultiple doesn't seem to work at all #10

Closed zeenix closed 1 year ago

zeenix commented 1 year ago

I was trying to use JoinMultiple in zbus and getting a complete hang. After hours of investigation, I realized that it's actually JoinMultiple impl that fails. I created a test case using code from the example and I also get a hang there:

            pub struct RemoteLogSource {
                stream: Pin<Box<dyn Stream<Item = Message>>>,
            }
            let mut logs = [
                RemoteLogSource {
                    stream: Box::pin(futures_util::stream::iter([
                        Message { serial: 1 },
                        Message { serial: 3 },
                        Message { serial: 5 },
                    ])),
                },
                RemoteLogSource {
                    stream: Box::pin(futures_util::stream::iter([
                        Message { serial: 2 },
                        Message { serial: 4 },
                        Message { serial: 6 },
                    ])),
                },
            ];
            let streams: Vec<_> = logs
                .iter_mut()
                .map(|s| FromStream::with_ordering(&mut s.stream, |m| m.serial).peekable())
                .collect();
            let mut joined = JoinMultiple(streams);
            for i in 0..6 {
                let msg = joined.next().await.unwrap();
                assert_eq!(msg.serial, i as u32 + 1);
            }

A similar impl based on Join works fine:

          let stream1 = Box::pin(futures_util::stream::iter([
                Message { serial: 1 },
                Message { serial: 3 },
                Message { serial: 5 },
            ]));

            let stream2 = Box::pin(futures_util::stream::iter([
                Message { serial: 2 },
                Message { serial: 4 },
                Message { serial: 6 },
            ]));
            let mut joined = join(
                FromStream::with_ordering(stream1, |m| m.serial).peekable(),
                FromStream::with_ordering(stream2, |m| m.serial).peekable(),
            );
            for i in 0..6 {
                println!("{}", i);
                let msg = joined.next().await.unwrap();
                assert_eq!(msg.serial, i as u32 + 1);
            }