zeromq / czmq

High-level C binding for ØMQ
czmq.zeromq.org
Mozilla Public License 2.0
1.16k stars 523 forks source link

ZMQ_STREAM support in cmzq #2192

Open sphaero opened 3 years ago

sphaero commented 3 years ago

I'm not sure if czmq supports the stream socket fully. I'm trying to add a test for it in zsock.c:

#ifdef ZMQ_STREAM
    zsock_t *streamrecv = zsock_new_stream("@tcp://127.0.0.1:1234");
    assert (streamrecv);

    zsock_t *streamsender = zsock_new_stream(">tcp://127.0.0.1:1234");
    assert (streamsender);

    zmsg_t *connectmsg = zmsg_recv(streamrecv); // we receive a message on connect
    zframe_t *id = zmsg_pop(connectmsg);        // first frame is id
    assert (id);
    assert (zframe_size(id) == 5);              // the id is 5 bytes
    zframe_t *empty = zmsg_pop(connectmsg);     // second frame is empty
    assert (empty);
    assert (zframe_size(empty) == 0);

    // send a http request, first get our routing id
    uint32_t rid = zsock_routing_id (streamsender);
    zmsg_t *request = zmsg_new();
    assert (request);
    rc = zmsg_addmem(request, &rid, sizeof(uint32_t));
    assert (rc == 0);
    rc = zmsg_addstr(request, "GET /\n\n");
    assert (rc == 0);
    rc = zmsg_send(&request, streamsender);
    assert (rc == 0);

    // receive the request
    zmsg_t * recvreq = zmsg_recv(streamrecv);
    assert (recvreq);
    zframe_t* ridframe = zmsg_pop(recvreq); // first frame is routing id
    uint32_t *rid2 = (uint32_t *)zframe_data(ridframe);
    assert (*rid2 == rid);
    char *httpreq = zmsg_popstr(recvreq);
    assert (streq(httpreq, "GET /\n\n"));
#endif

This asserts on the send" rc = zmsg_send(&request, streamsender);

It's doing the connect but requesting the routing id is apparently only supported for the SERVER socket?

Anybody thoughts, hints, suggestions?

P.s. cleanup is missing in the code

sphaero commented 3 years ago

It's a bit fuzzy but the routing_id should be used. However routing_id methods only deal with SERVER sockets in czmq. So this has te be done manually for now. We can get the sockets routing_id as follows:

byte *rid = (byte *)zsock_identity(streamsender);

This is a bit fuzzy. The ZMQ_IDENTITY option is deprecated in favour of ZMQ_ROUTING_ID. However the routing_id methods in czmq are quite different from zsock_identity (which are generated?). zsock_identity returns directly from zmq_getsockopt. zsock_routing_id returns a cached uint32_t which is only set for SERVER sockets on brecv.

So to workaround this manually we need to call zmq methods directly to retrieve the sockets id:

uint8_t ridr [256];
size_t rid_size = 256;
rc = zmq_getsockopt (zsock_resolve(streamsender), ZMQ_IDENTITY, ridr, &rid_size);
assert (rc == 0);

This is almost identical to calling zsock_identy. However it returns a char* so you cannot know the the size.

The routing_is methods in czmq are draft. Therefore it could be an option to change them so they are more in sync with libzmq's documentation. But reading the code it seems czmq is trying to make the routing _id transparent so users are not bothered with it.

sphaero commented 3 years ago

For now created #2193

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity for 90 days. It will be closed if no further activity occurs within 21 days. Thank you for your contributions.