tokio-rs / tokio-core

I/O primitives and event loop for async I/O in Rust
Apache License 2.0
640 stars 115 forks source link

TcpListener::from_listener() doesn't seem to work with listeners created via std::net::TcpListener::from_raw_fd() #241

Open liftoff opened 7 years ago

liftoff commented 7 years ago

Sorry for the lengthy code but here's my example:

https://gist.github.com/liftoff/d15fbb2d472642240af4b0acf2664bae

The pertinent lines are:

fd_listener = std::net::TcpListener::from_raw_fd(fd);
let xcb_fd_listener = TcpListener::from_listener(fd_listener, &addr, &h1).unwrap();

Then later I start up the reactor like so:

h1.spawn(xcb_handler);
h1.spawn(webserver);
loop {
    lp.turn(None);
}

The webserver works fine. The xcb_handler does not. It's like the reactor isn't detecting that the underlying fd has data to be read.

If I use mio directly with poll.register(&fd_listener, XCBEVENT, mio::Ready::readable(), mio::PollOpt::edge()).unwrap(); it seems to work fine (fd_listener was made from mio:tcp:TcpListener::from_raw_fd()`).

I asked in #rust-beginners but no one was able to figure it out. As I understand it the value of addr passed to tokio_core::net::TcpListener::from_listener() should be ignored if the underlying socket is from an fd but I'm not certain of this. I'm also not certain that I'm going about this the best way. I mean, it seems logical but I can't find any examples of using Tokio to listen for events on an fd.

Any assistance with this is greatly appreciated.

alexcrichton commented 7 years ago

Hm interesting! If you can slim this down that'd be great, and maybe try poking around with strace to see what's going on? There may be some stray syscall that's causing problems.

The address being passed in is only actually used on Windows, it's just required everywhere for a consistent interface.

liftoff commented 7 years ago

I've made some progress here. Something inside of tokio is doing something that results in "Error: Invalid argument (os error 22)" which causes the incoming() method to throw an error instead of getting to the point where it can execute my for_each() stuff.

Here's the strace of that happening:

fcntl(7, F_GETFL)                       = 0x802 (flags O_RDWR|O_NONBLOCK)
fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
epoll_ctl(3, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=4, u64=4}}) = 0
socketpair(AF_LOCAL, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0, [8, 9]) = 0
getrandom("", 0, GRND_NONBLOCK)         = 0
getrandom("\210^\5\236\375\356l\355", 8, GRND_NONBLOCK) = 8
getrandom("-\227\222\303Q\27\35\202", 8, GRND_NONBLOCK) = 8
rt_sigaction(SIGINT, {0x5612416b7a80, [], SA_RESTORER|SA_RESTART|SA_SIGINFO|SA_NOCLDSTOP, 0x7f9327493670}, {SIG_DFL, [], 0}, 8) = 0
epoll_ctl(3, EPOLL_CTL_ADD, 8, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=6, u64=6}}) = 0
epoll_wait(3, [{EPOLLOUT, {u32=4, u64=4}}, {EPOLLOUT, {u32=6, u64=6}}], 1024, 0) = 2
epoll_wait(3, [], 1024, 0)              = 0
epoll_wait(3, [{EPOLLIN|EPOLLOUT, {u32=4, u64=4}}], 1024, -1) = 1
write(5, "\1", 1)                       = 1
epoll_wait(3, [{EPOLLIN, {u32=4294967295, u64=18446744073709551615}}], 1024, 0) = 1
read(4, "\1", 128)                      = 1
read(4, 0x7ffcd605f668, 128)            = -1 EAGAIN (Resource temporarily unavailable)
accept4(7, 0x7ffcd605def0, 0x7ffcd605dfbc, SOCK_CLOEXEC) = -1 EINVAL (Invalid argument)
write(1, "Error: Invalid argument (os erro"..., 38Error: Invalid argument (os error 22)
) = 38
close(7)

I suspect this is happening because something is trying to use the address of the TcpListener which of course won't work.

alexcrichton commented 7 years ago

Is the file descriptor coming out of xcb actually a TCP listener? Or is it a unix socket? (in which case tokio-uds may work better here)

liftoff commented 7 years ago

The file descriptor coming out of xcb is a Unix Socket. I looked at the tokio-uds package on Github but there was no documentation and no examples so I wasn't sure how to even begin using it. Is there an example somewhere that would match my use case?

liftoff commented 7 years ago

Actually, I'm not sure what the file descriptor points to. It's whatever I get back from xcb::ffi::base::xcb_get_file_descriptor(conn.get_raw_conn()); I assume it's a socket because XCB can work over a network.

alexcrichton commented 7 years ago

Oh the tokio-uds crate is intended to basically be the same as the TCP types in tokio-core (hence the lack of current docs), so you should be able to use mostly the same APIs?

liftoff commented 7 years ago

I was able to finagle the code into using tokio-uds but I get the same exact error, "Error: Invalid argument (os error 22)". Wish it was more descriptive!

liftoff commented 7 years ago

I'm totally lost at this point as to how to get tokio to work like it does with mio. I thought it might be helpful to post the mio equivalent code (which works fine):

https://gist.github.com/liftoff/d6cead19e434871985047769ee3df7a3

The relevant portion is probably this:

loop {
    poll.poll(&mut events, None).unwrap();

    for event in events.iter() {
        match event.token() {
            SERVER => {
                // Accept and drop the socket immediately, this will close
                // the socket and notify the client of the EOF.
                println!("Incoming connection!");
                let _ = server.accept();
                println!("Closing connection");
            }
            XCBEVENT => {
                let start = Instant::now();
                if event.kind().is_readable() {
                    let _ = handle_xcb_event(
                        &conn,
                        filetype,
                        counter.next().unwrap(),
                        damage_event,
                        &seg,
                    );
                }
                let elapsed = start.elapsed();
                println!(
                    "Elapsed: {} ms",
                    (elapsed.as_secs() * 1_000) + (elapsed.subsec_nanos() / 1_000_000) as u64
                );
            }
            _ => unreachable!(),
        }
    }
}
alexcrichton commented 7 years ago

It looks like in the mio code you're ignoring the return value of accept, what happens if you do the same with futures and ignore the error case of the stream?

liftoff commented 7 years ago

I'm not sure what you mean? I'm only calling accept on the actual socket. Not on the XCBEVENT token part.

alexcrichton commented 7 years ago

Right what Tokio is doing is also calling accept (the same method) but the error is coming out and going into the stream, which is then later processed for sleeping in your example. In the mio code, you're ignoring any error happening in accept

liftoff commented 7 years ago

How do I do the equivalent in Tokio though?

alexcrichton commented 7 years ago

Probably something like -- listener.incoming().and_then(|x| Ok(()))

liftoff commented 7 years ago

Replacing:

let xcb_handler = xcb_fd_listener.incoming()
        .for_each(move |(_socket, _welcome)| {

...with:

let xcb_handler = xcb_fd_listener.incoming()
        .and_then(move |(_socket, _welcome)| {

Results in compilation errors:

error[E0277]: the trait bound `futures::stream::MapErr<futures::stream::AndThen<std::boxed::Box<futures::Stream<Item=(tokio_uds::UnixStream, std::os::ext::net::SocketAddr), Error=std
::io::Error> + std::marker::Send>, [closure@src/main.rs:297:23: 390:10 cloned_conn:_, damage_event:_, root:_, seg:_, filetype:_, width:_, height:_], std::result::Result<(), std::io::
Error>>, [closure@src/main.rs:390:20: 393:10]>: futures::Future` is not satisfied
--> src/main.rs:451:12
    |
451 |         h1.spawn(xcb_handler);
    |            ^^^^^ the trait `futures::Future` is not implemented for `futures::stream::MapErr<futures::stream::AndThen<std::boxed::Box<futures::Stream<Item=(tokio_uds::UnixStrea
m, std::os::ext::net::SocketAddr), Error=std::io::Error> + std::marker::Send>, [closure@src/main.rs:297:23: 390:10 cloned_conn:_, damage_event:_, root:_, seg:_, filetype:_, width:_,
height:_], std::result::Result<(), std::io::Error>>, [closure@src/main.rs:390:20: 393:10]>`

error: aborting due to previous error

The same thing happens if I replace:

.map_err(|_e| {
        println!("Error: {}", _e);
        ()
    });

...with your and_then() equivalent.

liftoff commented 7 years ago

I have an idea... Maybe the problem is that something is calling accept() on that TcpListener (or UnixListener) when it shouldn't (that will always result in an error). Is there a way I can prevent tokio from calling accept() when I use .incoming()?

alexcrichton commented 7 years ago

Hm so in any case it sounds like this maybe isn't the best usage of raw file descriptors. When creating a Rust type it's consumign ownership of the file descriptor but it looks like the C side of things still has the file descriptor in this case? It may be best to basically use PollEvented manually instead of trying to shoehorn this into the standard abstractions

0xd34d10cc commented 7 years ago

As far as I know, XCB is a client library for X window server. Maybe you need to create TcpStream\UdsStream instead of TcpListener\UdsListener?

liftoff commented 7 years ago

@alexcrichton I have no idea what I'm doing (haha)... Can you point me to a minimal example of a PollEvented implementation I can use as a base? I figure I should just have it call the equivalent poll() function from xcb instead of the usual socket stuff.

@0xd34d10cc How would I use UdsListener instead of TcpStream? I only have examples that use listener.incoming().for_each(). As I understand it the incoming() method just returns a stream? So I could presumably skip the listener part altogether and just... what? I just need an example.

0xd34d10cc commented 7 years ago

@liftoff TcpStream instead of TcpListener in case when XCB opens tcp connection. UdsStream instead of UdsListener in case when XCB opens uds connection. That 'listener part' is about 'accepting' connections. As far as I understand, In case of XCB you need not 'accept' connections, but read events from already opened (via xcb::Connection::connect) connection. TcpStream\UdsStream does just that.

0xd34d10cc commented 7 years ago

Actually, I think tokio cannot be used with XCB that way. XCB handles io by itself so the only thing you want is to be notified when socket is ready to read. Tokio has much more higher-level api which cannot provide such information.

liftoff commented 7 years ago

@0xd34d10cc Any recommendations as to how I should proceed then? I was wondering if there was an easy way to just have Tokio repeatedly call poll_for_event() in the background until the program ends.

carllerche commented 7 years ago

@0xd34d10cc Tokio should work fine, it has far more than request / response (despite what the docs might make seem).

0xd34d10cc commented 7 years ago

@carllerche ah, I've missed that TcpStream provides poll_read method which, if wrapped with Future, can do what @liftoff needs.