aatxe / irc

the irc crate – usable, async IRC for Rust
Mozilla Public License 2.0
530 stars 97 forks source link

Polling instead of foreach #89

Closed Yatekii closed 7 years ago

Yatekii commented 7 years ago

Hey!

I was trying to combine the IRC and the Telegram lib and thus it would not really be feasible to use a foreach since it's blocking. So I was trying to use poll(). Sadly it dies after one message with:

"rdy"
"msg"
thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', /Users/rustbuild/src/rust-buildbot/slave/beta-dist-rustc-mac/build/src/libcore/option.rs:323
stack backtrace:
   1:        0x104e26a7a - std::sys::imp::backtrace::tracing::imp::write::hbb6a94eade90b732
   2:        0x104e2885f - std::panicking::default_hook::{{closure}}::h0cb8f299c05ede1d
   3:        0x104e28507 - std::panicking::default_hook::h34b925d635468757
   4:        0x104e28d26 - std::panicking::rust_panic_with_hook::hdf1ac23b3d72a5c1
   5:        0x104e28b74 - std::panicking::begin_panic::h1322dc254aaf2333
   6:        0x104e28ae2 - std::panicking::begin_panic_fmt::h6d0c278aad886897
   7:        0x104e28a47 - rust_begin_unwind
   8:        0x104e4b7e0 - core::panicking::panic_fmt::h34435f30a3834ce1
   9:        0x104e4b6e4 - core::panicking::panic::hfc671ef5e1f91028
  10:        0x104c5bd11 - <core::option::Option<T>>::unwrap::h77235f242451d000
  11:        0x104cfd5df - <irc::client::server::IrcServer as irc::client::server::Server>::stream::h62da4b89a1143315
  12:        0x104c3ee0e - bastli_bot::main::hf022bd50bf531595
  13:        0x104e29c0a - __rust_maybe_catch_panic
  14:        0x104e290c6 - std::rt::lang_start::hcdf303e938e3ed64
  15:        0x104c3f819 - main

The code:

extern crate irc;

#[macro_use]
extern crate futures;

use futures::Async;

// use std::thread;

use std::default::Default;
use irc::client::prelude::*;

fn main() {
    // let irc_thread = thread::spawn(|| {
        let cfg = Config {
            nickname: Some(format!("BastliBot")),
            server: Some(format!("irc.freenode.net")),
            channels: Some(vec![format!("#bastli")]),
            .. Default::default()
        };
        let server = IrcServer::from_config(cfg).unwrap();
        server.identify().unwrap();
        loop {
            match server.stream().poll() {
                Ok(msg) => {
                    match msg {
                        Async::Ready(None) => println!("{:?}", "not"),
                        Async::Ready(_v) => println!("{:?}", "rdy"),
                        Async::NotReady => println!("{:?}", "not rdy")
                    }
                    println!("{:?}", "msg");
                }
                Err(_err) => println!("{:?}", "Moep")
            }
        }
    // });
    println!("{:?}", "KEK");
    // let _ = irc_thread.join();
}

Can anyone help me? People in #rust suggested it's a bug in the lib ...

Thanks Yatekii

aatxe commented 7 years ago

Sorry, I failed to document an important attribute of stream() here. You can only call it once on a server. So, you'll need to store it in a variable first.

aatxe commented 7 years ago

In more detail: The restriction here is that I can only ever have one copy of the receiving (i.e. Stream) half of the TcpStream. The stream we return in stream() is a wrapper of this stream. I can change the behavior of this function, but I'm not sure that doing so makes sense (as it might lead to unpredictable behavior). The way I see it there are three options (with variations possible):

  1. Leave the API the same – you can only get one stream.
  2. Provide two functions, stream_mut() which gives you a mutable reference, and take_stream() which behaves like the current one does.
  3. Our stream wrapper ServerStream currently takes the internal stream type. It could instead take an Arc<Mutex<...>> of the internal stream type and we could make one per call.

1 and 2 are not dramatically different, but there's an obvious variant on both of them which is to change stream()/take_stream() to have failure in its type somehow, whether that be returning Option, Result, or otherwise.

3 is obviously nice in that it appears to just work, but I'm hesitant to do it because I think the behavior of multiple streams might be surprising. That is, if you had two outstanding streams both reading stuff, whichever one polled first (after availability) would get the message and you might be surprised to find that the other one never sees that message.

Yatekii commented 7 years ago

Ok thanks for the elaborate answer!

For myself, one stream is more than enough I guess. I am also too much a noob in Rust to be able to actively judge.

Also my current code:

fn main() {
    // let irc_thread = thread::spawn(|| {
        let cfg = Config {
            nickname: Some(format!("BastliBot")),
            server: Some(format!("irc.freenode.net")),
            channels: Some(vec![format!("#bastli")]),
            .. Default::default()
        };
        let server = IrcServer::from_config(cfg).unwrap();
        server.identify().unwrap();
        let mut stream = server.stream();
        loop {
            match stream.poll() {
                Ok(msg) => {
                    match msg {
                        Async::Ready(None) => println!("{:?}", "not"),
                        Async::Ready(_v) => println!("{:?}", "rdy"),
                        Async::NotReady => println!("{:?}", "not rdy")
                    }
                    println!("{:?}", "msg");
                }
                Err(_err) => println!("{:?}", "Moep")
            }
        }
    // });
    // let _ = irc_thread.join();
}

Still results in panic ;)

thread 'main' panicked at 'no Task is currently running', /Users/yatekii/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.14/src/task_impl/mod.rs:44
stack backtrace:
   1:        0x10d632a8a - std::sys::imp::backtrace::tracing::imp::write::hbb6a94eade90b732
   2:        0x10d63486f - std::panicking::default_hook::{{closure}}::h0cb8f299c05ede1d
   3:        0x10d634517 - std::panicking::default_hook::h34b925d635468757
   4:        0x10d634d36 - std::panicking::rust_panic_with_hook::hdf1ac23b3d72a5c1
   5:        0x10d5ffa43 - std::panicking::begin_panic::h27901a6c38a82e88
   6:        0x10d6054e0 - futures::task_impl::with::h7e46c57b64fa1057
   7:        0x10d60553b - futures::task_impl::current::h7965b5cc18b46f5f
   8:        0x10d56c8d7 - tokio_core::reactor::io_token::IoToken::schedule_read::hc47178a331a82324
   9:        0x10d56d2c4 - <tokio_core::reactor::poll_evented::PollEvented<E>>::need_read::h2714d6e528b5d932
  10:        0x10d56d210 - <tokio_core::reactor::poll_evented::PollEvented<E>>::poll_ready::h2c9205e4ff39509a
  11:        0x10d56cf85 - <tokio_core::reactor::poll_evented::PollEvented<E>>::poll_read::hdf229e607757ad69
  12:        0x10d56d3df - <tokio_core::reactor::poll_evented::PollEvented<E> as std::io::Read>::read::hf11244c6d17be31a
  13:        0x10d56b77b - <tokio_core::net::tcp::TcpStream as std::io::Read>::read::h103e267988d20f42
  14:        0x10d4c853b - <tokio_io::framed::Fuse<T, U> as std::io::Read>::read::hcbbff68fa1ee0c7b
  15:        0x10d4d1acb - <tokio_io::framed_write::FramedWrite2<T> as std::io::Read>::read::h8233e64c3a2e8f7f
  16:        0x10d4e7b1f - tokio_io::AsyncRead::read_buf::h91aa75f90327dbc1
  17:        0x10d4e4ab2 - <tokio_io::framed_read::FramedRead2<T> as futures::stream::Stream>::poll::hafeac423487899ca
  18:        0x10d4e243b - <tokio_io::framed::Framed<T, U> as futures::stream::Stream>::poll::hd15a67fb5adae8d8
  19:        0x10d50c075 - <irc::client::transport::IrcTransport<T> as futures::stream::Stream>::poll::hac97438c05dccb90
  20:        0x10d4ff325 - <irc::client::conn::Connection as futures::stream::Stream>::poll::h311104dde2957274
  21:        0x10d4e5f93 - <futures::stream::split::SplitStream<S> as futures::stream::Stream>::poll::haee26b9518f0b28c
  22:        0x10d50262f - <irc::client::server::ServerStream as futures::stream::Stream>::poll::hc638c3835bdfc56f
  23:        0x10d44ae2f - bastli_bot::main::hf022bd50bf531595
  24:        0x10d635c1a - __rust_maybe_catch_panic
  25:        0x10d6350d6 - std::rt::lang_start::hcdf303e938e3ed64
  26:        0x10d44b829 - main

Guess the noob clause holds true ;)

EDIT: On multiple streams though: why would you want that? Can't this break things horribly because you basically duplicate the TCP connection?

aatxe commented 7 years ago

So, I've been looking at a bunch of issues on the Futures repository related to this sort of thing.

I think this one is notable: https://github.com/alexcrichton/futures-rs/issues/136. Here are some choice quotes (from @alexcrichton):

For poll, it is indeed difficult to call Future::poll directly, and you are indeed supposed to call task::spawn first.

I don't think we should document poll as "you should never call this" because if you're implementing poll itself it's perfectly ok to call other poll methods (for example). At the top level, though, yeah it shouldn't be called.

There's some more stuff later in the issue about how to use this stuff as an end-user. I think the direct answer is: To do something like this, you need to have an executor on which poll can run. This part of the tokio guide explains in more detail how to do this. I'm honestly still unclear on this myself, but maybe it makes sense to provide a method for running functions on our event loop. I believe this would allow you to run similar sort of code easily.

aatxe commented 7 years ago

Here is a very different answer: The goal here is to presumably process messages from Telegram and IRC in an interleaved fashion, right? You can do this at a much higher level. Assuming you have two streams, irc_stream and telegram_stream, you should be able to do something like:

irc_stream.merge(telegram_stream).for_each(|pair| match pair {
  MergedItem::First(irc_msg) => process_irc(irc_msg),
  MergedItem::Second(telegram_msg) => process_telegram(telegram_msg),
  MergedItem::Both(irc_msg, telegram_msg) => {
    process_irc(irc_msg);
    process_telegram(telegram_msg)
  }
}).wait().unwrap()
Yatekii commented 7 years ago

Thanks a very lot for your extended help! I will try and read more on streams, but it doesn't make any sense to me to have a poll function which isn't supposed to be called. Maybe I am missunderstanding it and I am actually supposed to get sample by sample in a different fashion.

The merge of the two streams would indeed work as you described my usecase perfectly. Still, if I had some other functionality, which couldn't be merged into the stream that wouldn't work anymore :( I will try and press upon further explanation a little more in #rust too :)

Also for now I guess I will go with the merge.

Thanks a lot! Also, is there an IRC channel where to find you?

Best regards Yatekii

Sent from my LGE Nexus 5X using FastHub

aatxe commented 7 years ago

You can find me in a few places on IRC. The easiest ones are probably Freenode (where I am aatxe) and Mozilla (where I am awe).