ropensci / rzmq

R package for ZMQ
https://docs.ropensci.org/rzmq
83 stars 30 forks source link

Trying to set ZMQ_CONFLATE within rzmq #56

Open modernresearch opened 3 years ago

modernresearch commented 3 years ago

I'm trying to set option ZMQ_CONFLATE to 1 (http://api.zeromq.org/master:zmq-setsockopt) for a ZMQ_SUB socket.

I didn't see a built-in function to do that, so I tried to add my own. Here's what I did:

  1. Added function to R/zmq.R, following pattern of set.affinity:

    set.conflate <- function(socket, option.value) {
    .Call("set_conflate",socket, option.value, PACKAGE="rzmq")
    }
  2. Added set.conflate to export list in NAMESPACE file.

  3. Added set_conflate function in src/interface.cpp, following same pattern as set_affinity:

SEXP set_conflate(SEXP socket_, SEXP option_value_) {

  zmq::socket_t* socket = reinterpret_cast<zmq::socket_t*>(checkExternalPointer(socket_,"zmq::socket_t*"));
  if(!socket) { REprintf("bad socket object.\n");return R_NilValue; }
  if(TYPEOF(option_value_)!=INTSXP) { REprintf("option value must be an int.\n");return R_NilValue; }
  SEXP ans; PROTECT(ans = allocVector(LGLSXP,1)); LOGICAL(ans)[0] = 1;

  uint64_t option_value(INTEGER(option_value_)[0]);
  try {
    socket->setsockopt(ZMQ_CONFLATE, &option_value, sizeof(uint64_t));
  } catch(std::exception& e) {
    REprintf("%s\n",e.what());
    LOGICAL(ans)[0] = 0;
  }
  UNPROTECT(1);
  return ans;
}
  1. Added a line to src/interface.h in the extern "C" block:

SEXP set_conflate(SEXP socket_, SEXP option_value_);

  1. Deleted src/rzmq.so from my prior installation.

  2. Re-installed package from source, verified src/rzmq.so was re-built.

  3. When I tried to use it, it definitely finds the function and tries, but it says the argument is invalid:

> library(rzmq)
> sock = init.socket(init.context(), "ZMQ_SUB")
> connect.socket(sock, "tcp://host:port")
> subscribe(sock, "")
> set.conflate(sock, 1L)
Invalid argument
[1] FALSE

Two questions:

  1. Why would it say 1L is an invalid argument? The docs say it's expecting an integer.
  2. Is there a better way to accomplish setting ZMQ_CONFLATE to 1?
armstrtw commented 3 years ago

Can you push your branch, and drop a link to it here?

Alternatively if you make a PR w/ your changes, we can test.

modernresearch commented 3 years ago

Thanks for the quick response - here's the PR: https://github.com/ropensci/rzmq/pull/57

Let me know if you need anything else!

armstrtw commented 3 years ago

Your PR seems correct.

I think the issue you are seeing w/ 'Invalid argument' is related to your sock object being invalid.

Please check that before calling set.conflate.

Additionally, you can add a simple check after the setsocketopt call to see if your cmd was successful.

something along these lines (untested): uint64_t option_value_check; // before try block

// add after the call to setsocketopt socket->getsockopt(ZMQ_CONFLATE, &option_value_check, sizeof(uint64_t)); cout << option_value_check << endl;

I don't have time to test locally at this moment, but I can try later this weekend.

modernresearch commented 3 years ago

Thanks, will check it out and follow up