Open msune opened 7 years ago
Sorry but we can't break zmq_poll compatibility, as its API is finalised.
But we have in development a new polling implementation, you can find it as zmq_poller - I think that would be a good place to implement this functionality. That API has not yet been declared STABLE, so it can be changed if needed.
https://github.com/zeromq/libzmq/blob/master/include/zmq.h#L585 https://github.com/zeromq/libzmq/blob/master/src/zmq.cpp#L754 https://github.com/zeromq/libzmq/blob/master/src/socket_poller.cpp
@bluca I will have a look into the poller, and see if it can be easily implemented. We can keep this issue for discussion or you can close.
Thanks! It's fine to keep this open for discussion. Let us know if you need any help.
I think I might have a simple solution, right now we have zmq_poller_add which accept events to listen to. We can have an overload which accept a predicate (function pointer) instead of the events. The predicate will be run when the socket is signaled and return the events for the socket. This will work as zeromq socket is edge triggered.
Inside the predicate user can implement any logic, including checking if specific identity is ready.
we should also had a API function or socket option to retrieve the status of specific identity inside the router socket.
API signature can be:
ZMQ_EXPORT int zmq_poller_add_extended (void *poller, void *socket, void *user_data, short events, short (*get_events)(void* s, void* user_data));
Sounds great! What do you think @msune ?
Hm, isn't this a bit too generic? Can you think of any other useful implementation of such a callback?
And shouldn't the events be passed to the callback as well?
ZMQ_EXPORT int zmq_poller_add_extended (void *poller, void *socket, void *user_data, short events, short (*get_events)(void* s, short events, void* user_data));
You don't need the events, you provide them. If you want to get them, just call getsockopt with ZMQ_EVENTS.
Regarding other implementations, we also have stream and server socket types which can use this extension.
Ok. In the context of this issue, the other API function you mentioned is the critical point.
If this is solved via a socket option, it would need to return the status of all connected peers. We cannot query a specific peer. The data structure that is required for that would be somewhat large, since it would need to be an array whose entries contain an identity (up to 255 bytes) and the peer state.
So maybe using a dedicated new API function is better such as
ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, void *identity, size_t identity_size);
But do we know the relevant identities in the callback function? Or is it vice versa, and we should be able to query the identities of all ready peers in case of an event? Then the poller API could remain unchanged.
you can know the identities during the callback function through the user_data parameter. Can be a pointer to an array are user created data.
I like the zmq_socket_get_peer_state method.
(I don't have regular access to email or internet. I will be back the 27th).
As @sigiesec says, at first glance, I can't see any use-case for the callback approach. I see drawbacks on this approach:
I am on the phone, so I can't check the poller APIs easily. What I had in mind originally was something like:
ZMQ_EXPORT int zmq_poller_extended(/*Regular parameters of poller*/, zmq_peer_state_t* peer_states, uint32_t peer_state_size);
Where you would typically have peer_states
in stack with a reasonable MAX peers (clearly application specific). zmq_poller_extended
therefore should somehow (return value?) notify the user if peer_state_size
was not sufficient to store all the states of the peers.
Along with this, there is the need to have a zmq_read_extended()
call which allows to specify the identity and read from that particular incoming queue.
I think it would also be useful to have, but definitely not strictly necessary with this approach:
ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, void *identity, size_t identity_size);
Thoughts?
@msune I am now at a point in our use case where this is urgently needed as well.
I am not sure if we really need to change the poller API at all. At least for our use, zmq_socket_get_peer_state would be enough.
If zmq_poller_wait returns a ROUTER/MANDATORY socket with ZMQ_POLLOUT set, then we know that it is writable for at least one peer. In our case, we have a message for some particular peers, and we could call zmq_socket_get_peer_state for each of them, and if that peer's state is writable, we send the message, otherwise we skip it.
I will try to translate this into libzmq API code.
Here is a test case that could not be easily implemented without zmq_socket_get_peer_state, but should work with it:
void test_get_peer_state ()
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
int mandatory = 1;
rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory));
// Create dealer called "X" and connect it to our router
void *dealer1 = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer1);
rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, "X", 1);
assert (rc == 0);
rc = zmq_connect (dealer1, my_endpoint);
assert (rc == 0);
// Create dealer called "Y" and connect it to our router
void *dealer2 = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer2);
rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, "Y", 1);
assert (rc == 0);
rc = zmq_connect (dealer2, my_endpoint);
assert (rc == 0);
// Get message from dealer to know when connection is ready
char buffer [255];
rc = zmq_send (dealer1, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 1);
assert (buffer [0] == 'X');
void *poller = zmq_poller_new();
assert(poller);
rc = zmq_poller_add(poller, router, NULL, ZMQ_POLLOUT);
assert (rc == 0);
rc = zmq_poller_add(poller, dealer1, NULL, ZMQ_POLLIN);
assert (rc == 0);
const size_t count = 10000;
const size_t event_size = 2;
zmq_poller_event_t events[event_size];
for (size_t iterations = 0;
iterations < count
&& zmq_poller_wait_all (poller, events, event_size, -1) != -1;
++iterations) {
for (size_t i = 0; i < event_size; ++i) {
if (events[i].socket == router) {
if (zmq_socket_get_peer_state (router, "X", 1) & ZMQ_POLLOUT) {
rc = zmq_send (router, "X", 1, ZMQ_SNDMORE);
assert (rc == 1);
rc = zmq_send (router, "Hello", 5, 0);
assert (rc == 5);
}
if (zmq_socket_get_peer_state (router, "Y", 1) & ZMQ_POLLOUT) {
rc = zmq_send (router, "Y", 1, ZMQ_SNDMORE);
assert (rc == 1);
rc = zmq_send (router, "Hello", 5, 0);
assert (rc == 5);
}
}
if (events[i].socket == dealer1) {
rc = zmq_recv (dealer1, buffer, 255, ZMQ_DONTWAIT);
assert (rc == 5);
}
// never read from dealer2, so the router pipe for dealer2 becomes full eventually
}
}
rc = zmq_close (router);
assert (rc == 0);
rc = zmq_close (dealer1);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
What do you think? Does this make sense? Obviously, I did not run this, since zmq_socket_get_peer_state is not yet implemented.
I started work based on this in my branch https://github.com/sigiesec/libzmq/tree/fix-issue-2623
@sigiesec in the approach (A) I was proposing, the state of the peer would be filled in, in the extended_peer_state
object that is passed to zmq_poller_extended()
, per peer.
Another option (B) I've been thinking, a bit on the zmq_socket_get_peer_state()
line, but without the callback overhead, would be to, once the socket (e.g. router) has events and returns from a regular zmq_poller()
, use a call a zmq_socket_get_peer_states()
, which can return the state for a list of peers (ZMQ identities), instead of a single peer.
I still like (A) better, since in a single call you have everything you need to continue RX/TXing.
In both approaches, you need to also implement zmq_recv_identity()
, in order for the RX path to work correctly.
@others; thoughts?
I plan to work a bit on it by the end of the week. I will let you know, and possibly reference some branch for RFC here.
@msune Maybe you can also have a look at my test case https://github.com/sigiesec/libzmq/blob/bf6f4ac2fdd2b0aade801e48c1d3d9ab55779bde/tests/test_router_mandatory.cpp#L32
I can imagine that your use case is related but different, and requires additional functions. I am really interested in how your test case will look like :)
The test is not working yet, as against my assumption any number of messages can be sent to dealer2, even though they are never read. I think this is related to my misunderstanding ZMQ_RCVHWM (see #2724).
Actually, the test case works with inproc sockets, but not with tcp sockets.
@msune @somdoron Any thoughts on the additions in #2730? Do you think there is the need for more changes on the API side?
the problem is that you are going to get HIGH CPU. Polling on router with POLLOUT will always return (if there at least one socket ready), therefore you get high cpu whenever there is one socket ready, might not be the socket you are interested with. This is why I suggested the extension to the zmq_poller.
@somdoron good point :)
Sorry for not coming back to this before, I had been busy.
Agreed with @somdoron. I think the underlying problem is much of a wider scope.
Give me 24h (tomorrow EOB) so that I can come up with the draft of the APIs, along of the lines of the last comment, as I see them fit the entire per-peer problem (both RX POLLIN
and TX POLLOUT
), based on my experience on our different use-cases.
So,
You can find some preliminary, incomplete work here, but I think sufficient to trigger discussion. I will continue tomorrow, but I would appreciate early feedback before going any much further.
This branch is frozen for discussion:
https://github.com/msune/libzmq/commits/poller_peer_support_discuss
In particular, this commit:
https://github.com/msune/libzmq/commit/1c386bf4eb3a1d8ab24bd96006b17f0df123b412
Key 4 additional methods are:
+ZMQ_EXPORT int zmq_poller_add_peer (void *poller, void *socket, void *zid,
+ int zid_len,
+ void *user_data,
+ short events);
+ZMQ_EXPORT int zmq_poller_modify_peer (void *poller, void *socket, void *zid,
+ int zid_len,
+ short events);
+ZMQ_EXPORT int zmq_poller_remove_peer (void *poller, void *socket, void *zid,
+ int zid_len);
+ZMQ_EXPORT int zmq_recv_peer (void *s, void *zid, int zid_len, void *buf,
+ size_t len,
+ int flags);
I think the usage is clear. One can use regular poller methods to add the entire socket or (or in addition) use _peer to add specific ZMQ identity poller items.
To read from a specific ZMQ identity, zmq_recv_peer()
There is probably a better name than peer
(identity?). Open to suggestions.
A lot:
poller_peer()
methods to check that socket is multi-peerzmq_recv_peer()
and support for ZMQ_PEER_EVENTS
in getsockopt()
on sockets with multi-peer supportI dislike using getsockopt()
with an additional struct zmq_gso_peer_events_t
for ZMQ_PEER_EVENTS
. I was following the current approach.
I would very much prefer opening an API to the socket base to get that, and expose it through regular C bindings. Comments?
zmq::socket_poller_t::zero_trail_events()
and zmq::socket_poller_t::check_events()
I like the API. Real boost for zmq_poller. I like the name peer, much better than identity IMHO. I'm very open to C level API. However binding developers like the getsockopt. Do both in my opinion.
I'm not I understand the last question...
I suggest opening a PR, add WIP in the beginning. We can comment there...
Good stuff! I agree with Doron on the API - having also getsockopt in addition will make the binding folks very happy.
@somdoron Opened PR. Feel free to comment code there.
@somdoron wrote:
I'm not I understand the last question...
Do you mean?
I can't understand why poller and user items are different (but it is late here...). Is there any good reason for doing all this copy, ordering and mangling?
yes...
@msune Looks good "from afar", but at least some basic test case would be really good ASAP to see how the API is used.
(Related to PR #2622; partial fix, API and ABI compatible)
I am copying the description of the problem here from #2622:
Some possible solutions come to my mind now (obviously, all of them breaking API and ABI unless implemented in parallel):
1) Change
zmq_poll_item_t
to accept a list ofZMQ_IDENTITY
toZMQ_POLLOUT
as an optional parameter. If none is specified, then current behaviour should be maintained.2) Create
zmq_poll
able peerzmq_sock_t
(fake), and open APIs to get thosezmq_sock_t
givenZMQ_IDENTITY
to be placed as regularzmq_poll_item_t
.I tend to think 1) is a bit easier to implement and simpler to use. I would like to know the opinions. We could try to work a bit on it, either on
zmq_poll()
itself or a parallel version. But I want to make sure there are chances to be accepted before committing any efforts.Thoughts?