ropensci / rzmq

R package for ZMQ
https://docs.ropensci.org/rzmq
83 stars 30 forks source link

enable polling #9

Open glycerine opened 10 years ago

glycerine commented 10 years ago

I'd like to be able to pass ZMQ_DONTWAIT to the socket->recv() call in receiveSocket, in order to be able to poll but not block if no message is availble.

Since R is not very thread friendly, non-blocking polling is very useful for implementing a server.

Would you consider adding the ability to pass flags to socket->recv() ? Is it is, the flags_ parameter is always defaulted to zero.

Thanks!

Jason

in interface.cpp inside rzmq.tgz :

SEXP receiveSocket(SEXP socket_) {
  SEXP ans;
  bool status(false);
  zmq::message_t msg;
  zmq::socket_t* socket = reinterpret_cast<zmq::socket_t*>(checkExternalPointer(socket_,"zmq::socket_t*"));
  if(!socket) { REprintf("bad socket object.\n");return R_NilValue; }
  try {
    status = socket->recv(&msg); // line 282   <<<<<<<<<<<<<<<<<< here
  } catch(std::exception& e) {
    REprintf("%s\n",e.what());
  }
  if(status) {
    PROTECT(ans = allocVector(RAWSXP,msg.size()));
    memcpy(RAW(ans),msg.data(),msg.size());
    UNPROTECT(1);
    return ans;
  }

  return R_NilValue;
}
-UU-:----F1  interface.cpp (rzmq.tgz)   35% L282   (C++/l Abbrev Isearch) ------------------------------------------------------------------------------------------------------------------------
in zmq.hpp:

    class socket_t
    {
 ...
        inline bool recv (message_t *msg_, int flags_ = 0)   // line 444 <<<<<<<<<<<<<<< here
        {
            int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_);
            if (nbytes >= 0)
                return true;
            if (zmq_errno () == EAGAIN)
                return false;
            throw error_t ();
        }

    private:
        void *ptr;
        void *ctxptr;

        socket_t (const socket_t&) ZMQ_DELETED_FUNCTION;
        void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;
    };

-UU-:----F1  zmq.hpp        66% L444   (C++/l Abbrev) ----------------------------------------
memeplex commented 10 years ago

I need this too. A pull request for polling was accepted and merged a year ago, but there is no mention of polling in the cran documentation pdf, and the github readme for the project states there is no polling support. So I'm not sure what the current status is.

glycerine commented 10 years ago

I site the current code above (see line marked <<<<<<<<<<<<<<< here, in receiveSocket(), line 282 ). There is no way at present to set the flag from R.

armstrtw commented 10 years ago

That should be an easy change.

I'll try to patch by this weekend.

The original polling code for this was accepted w/out any documentation contribution. I haven't tested it myself.

glycerine commented 10 years ago

Thanks Whit! That would be great. Do you know how the CRAN release works? It would be great to be able to update rzmq there. I'm happy to help test any new patch if you'd like.

Also, thank you @memeplex, for pointing out the poll.socket() implementation available from the current github head (but not avail in CRAN, yet). I prefer using receiveSocket() because that is what I'm used to, and it is much simpler. Although it is good to know about poll.socket().

armstrtw commented 10 years ago

Yes, I can do the CRAN release. I'm still the package maintainer.

I think it makes sense to get these updates in place first, so if you want to submit a patch for that go ahead. Otherwise, I think I'll have some bandwidth this weekend to work on it.

glycerine commented 10 years ago

Agreed. Better to patch first, then release. I'll try to put a patch together for your review.

glycerine commented 10 years ago

pull request available: https://github.com/armstrtw/rzmq/pull/10