zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.74k stars 2.36k forks source link

pub/sub bugs #43

Closed ak47xp closed 11 years ago

ak47xp commented 14 years ago

There are bugs in the v2.0.7 code that result in problems with receiving multi-part PUB/SUB messages as described in the article. The simplest case is a two-part message with the first part containing a topic and the second part containing a payload.

  1. First received message becomes lost since the topic part is skipped over in the code that accounts for a new peer (pgm_receiver.cpp),

// We have to move data to the beginning of the first message. data += offset; received -= offset;

Payload part is then treated as a topic part and gets discarded by the filter.

  1. If the publisher terminates without waiting for zmq thread to drain its send queue and then restarts, this may result in concatenation of the topic part belonging to last message from the old instance and payload part of first message from the new instance. Topic part of the new message gets ignored and the payload part gets attached to the stale topic part, which, apparently, can linger in the decoder. Note that this may assign the payload to a wrong topic if the publisher restarts with a message to a topic different from the previous.
hurtonm commented 14 years ago

Can you please pull from my repository and test? http://github.com/hurtonm/zeromq2/commits/multipart_messages_encoder_fix

ak47xp commented 14 years ago

This seems to have fixed the head loss issue, however concatenation issue is still there.

Receiver program:

#include "iostream"
#include "zmq.hpp"
#define NUM_MSG         2048
int main (int argc, char* argv[])
{
        int i, num;
        long more = 1;
        size_t more_size = sizeof(more);

        if (argc < 2) { std::cout << "Receiver arguments: " << std::endl;
                return 1;
        }

        num = (argc < 3) ? NUM_MSG : atoi(argv[2]);

        zmq::context_t ctx (1);
        zmq::socket_t s (ctx, ZMQ_SUB);
        s.bind ("epgm://eth0;224.10.10.10:5555");

        s.setsockopt (ZMQ_SUBSCRIBE, argv[1], std::strlen(argv[1]));

        std::cout << "Subscribed to [" << argv[1] << "], length " << std::strlen(argv[1]) << std::endl;

        for (i = 0; i < num; i++) {
                zmq::message_t hdr;
                zmq::message_t msg;

                s.recv (&hdr);
                s.getsockopt(ZMQ_RCVMORE, &more, &more_size);
                if (more) {
                        s.recv (&msg);
                        std::cout << "Topic [" << ((const char *)hdr.data()) << "] : "
                                  << i << " = " << ((const char *)msg.data())  << std::endl;
                } else
                        std::cout << "Topic [" << argv[1] << "] : "
                                  << i << " header only [" << ((const char *)hdr.data()) << "]" << std::endl;
        }
        return 0;
}

Sender program:

#include "iostream"
#include "sstream"
#include "zmq.hpp"
#define NUM_MSG         1
int main (int argc, char* argv[])
{
    int  i, num;
    long rate = 10000;

    if (argc < 3) {
        std::cout << "Sender arguments:   " << std::endl;
        return 1;
    }

    num =  (argc < 4) ? NUM_MSG : atoi(argv[3]);

    zmq::context_t ctx (1);
    zmq::socket_t s (ctx, ZMQ_PUB);
    s.setsockopt (ZMQ_RATE, &rate, sizeof (rate));
    s.connect ("epgm://eth0;224.10.10.10:5555");

    for (i = 0; i < num; i++) {
           std::ostringstream oss;
           oss << argv[2] << "[" << i << "]";
           zmq::message_t hdr (argv[1], std::strlen(argv[1]) + 1, NULL);
           zmq::message_t msg (oss.str().size() + 1);

           strcpy((char*)msg.data(), oss.str().c_str());

           std::cout << "Sending '" << ((const char*)msg.data()) <<
                        "' to [" << ((const char*)hdr.data()) << "]" << std::endl;
           s.send (hdr, ZMQ_SNDMORE);
           s.send (msg);
    }

    return 0;
}

Test:

Start receiver to wait for a large number of messages, then make sender transmit the same number. Once in a while this leads to the receiver not getting all of the sent messages. When this happens, restart the sender to send a single message to a different topic.

Start receiver:

[ak@linux src]$ /tmp/zrm test 16384

Start sender:

[ak@linux zmq]$ /tmp/zsm test 1111111111111111111111111111 16384 >& /dev/zero

Receiver output until it stops: ... Topic [test] : 12436 = 11111111111111111111111111111[12436] Topic [test] : 12437 = 11111111111111111111111111111[12437] Topic [test] : 12438 = 11111111111111111111111111111[12438]

Then, sender again to a different topic:

[ak@linux zmq]$ /tmp/zsm testing 222222222222222222222222222 1 >& /dev/zero

Receiver: ... Topic [test] : 12439 = testing Topic [test] : 12440 header only [22222222222222222222222222222[0]] Topic [testing] : 12441 = 22222222222222222222222222222[1]

hurtonm commented 14 years ago

Thanks for elaborating. Will look into this.

tsaubergine commented 13 years ago

Hi -

I am also having this issue where the first N-1 parts of an N part multi-message are being lost on the first message sent over epgm.

Is this issue still being looked into or should I post another example? I am using zeromq version 2.1.4.

tsaubergine commented 13 years ago

Here's a basic C++ example that shows this problem. Start pgmtest2 (the subscriber), then start pgmtest1. pgmtest2 should show

01
02 
03
01 
02
03

but instead shows

03
01 
02
03 

pgmtest1.cpp (publisher)

int main (int argc, char *argv[])
{
    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);

    try
    {
    publisher.connect("epgm://eth0;239.255.7.15:11142");
    }
    catch(std::exception& e)
    {
    std::cout << "error: " << e.what() << std::endl;
    }

    int64_t rate = 100;
    publisher.setsockopt(ZMQ_RATE, &rate, sizeof(rate));

    //  int i = 0;

    for(int i = 0, n = 2; i < n; ++i)
    {

    zmq_msg_t part1;
    int rc = zmq_msg_init_size (&part1, 1);
    assert (rc == 0);
    /* Fill in message content with unsigned char 1 */
    memset (zmq_msg_data (&part1), 1, 1);
    /* Send the message to the socket */
    rc = zmq_send (publisher, &part1, ZMQ_SNDMORE);
    assert (rc == 0);

    zmq_msg_t part2;
    rc = zmq_msg_init_size (&part2, 1);
    assert (rc == 0);
/* Fill in message content with unsigned char 2 */
    memset (zmq_msg_data (&part2), 2, 1);
/* Send the message to the socket */
    rc = zmq_send (publisher, &part2, ZMQ_SNDMORE);
    assert (rc == 0);

    zmq_msg_t part3;
    rc = zmq_msg_init_size (&part3, 1);
    assert (rc == 0);
/* Fill in message content with unsigned char 2 */
    memset (zmq_msg_data (&part3), 3, 1);
/* Send the message to the socket */
    rc = zmq_send (publisher, &part3, 0);
    assert (rc == 0);

    }

    return 0;
}

pgmtest2.cpp (subscriber)

    #include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <stdio.h>

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to talk to server
    zmq::socket_t subscriber (context, ZMQ_SUB);

    try
    {
    subscriber.connect("epgm://eth0;239.255.7.15:11142");
    }
    catch(std::exception& e)
    {
    std::cout << "error: " << e.what() << std::endl;
    }

    subscriber.setsockopt(ZMQ_SUBSCRIBE, 0, 0);

    for(;;)
    {

    zmq::message_t update;
    subscriber.recv(&update);

    std::string in(static_cast<const char*>(update.data()), update.size());
    std::cout << static_cast<int>(in[0]) << std::endl;
    }

    return 0;
}