erickt / rust-zmq

Rust zeromq bindings.
Apache License 2.0
900 stars 194 forks source link

zmq::poll: Socket operation on non-socket #189

Closed ghost closed 7 years ago

ghost commented 7 years ago

I use crate nix to create pipe, then create socket use pipe.0's addr as zmq::Socket::from_raw's param, at runtime get error: zmq::poll: Socket operation on non-socket

Anything wrong with my program?

extern crate nix;
extern crate zmq;

use std::io::{self, Write};
use std::os::raw::c_void;
use std::os::unix::io::RawFd;
use std::process;
use nix::sys::signal;

static mut S_FD: RawFd = -1;

extern "C" fn s_signal_handler(_: i32) {
    unsafe {
        let rc = nix::unistd::write(S_FD, b" ");
        match rc {
            Ok(_) => {}
            Err(_) => {
                println!("Error while writing to self-pipe.");
                let _ = io::stdout().flush();
            }
        }
    }
}

fn s_catch_signals(fd: RawFd) {
    unsafe {
        S_FD = fd;
    }

    let sig_action = signal::SigAction::new(signal::SigHandler::Handler(s_signal_handler),
                                            signal::SaFlags::empty(),
                                            signal::SigSet::empty());
    unsafe {
        signal::sigaction(signal::SIGINT, &sig_action).unwrap();
        signal::sigaction(signal::SIGTERM, &sig_action).unwrap();
    }
}

fn main() {
    let context = zmq::Context::new();
    let socket = context.socket(zmq::REP).unwrap();
    assert!(socket.bind("tcp://*:5555").is_ok());

    let mut pipefds = nix::unistd::pipe().unwrap();
    nix::fcntl::fcntl(pipefds.0, nix::fcntl::F_GETFL).unwrap();
    nix::fcntl::fcntl(pipefds.0, nix::fcntl::F_SETFL(nix::fcntl::O_NONBLOCK)).unwrap();

    let pipefds0_ptr: *mut RawFd = &mut pipefds.0;
    let pipefds0 = unsafe { zmq::Socket::from_raw(pipefds0_ptr as *mut c_void) };

    s_catch_signals(pipefds.1);

    let items = &mut [pipefds0.as_poll_item(zmq::POLLIN),
                      socket.as_poll_item(zmq::POLLIN)];

    loop {
        let rc = zmq::poll(items, -1);
        match rc {
            Ok(v) => {
                assert!(v >= 0);
                if v == 0 {
                    continue;
                }
                if items[0].is_readable() {
                    let buffer = &mut [0; 1];
                    nix::unistd::read(pipefds.0, buffer).unwrap();
                    println!("W: interrupt received, killing server...");
                    break;
                }
                if items[1].is_readable() {
                    let buffer = &mut [0; 255];
                    let rc = socket.recv_into(buffer, zmq::DONTWAIT);
                    match rc {
                        Ok(_) => println!("W: recv"),
                        Err(e) => {
                            if e == zmq::Error::EAGAIN || e == zmq::Error::EINTR {
                                continue;
                            }
                            println!("recv: {}", e);
                            process::exit(1);
                        }
                    }
                }
            }
            Err(e) => {
                if e == zmq::Error::EINTR {
                    continue;
                }
                println!("zmq::poll: {}", e);
                process::exit(1);
            }
        }
    }
}
birkenfeld commented 7 years ago

Don't make a socket out of a raw fd that isn't a socket. Rather, use PollItem::from_fd() to get a poll item.

ghost commented 7 years ago

Thank you @birkenfeld, this issue solved.