zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.8k stars 2.36k forks source link

Reason for zmq_assert (!_current_out) in xsend #4731

Open vome101 opened 3 months ago

vome101 commented 3 months ago

I'm getting an assert at zmq_assert (!_current_out). What's the reason for this assert?

_current_out = out_pipe->pipe if going through !more_out. Otherwise _current_out won't be used.

I'm using version 4.3.4.

int zmq::router_t::xsend (msgt *msg) { // If this is the first part of the message it's the ID of the // peer to send the message to. if (!_more_out) { zmq_assert (!_current_out);

    //  If we have malformed message (prefix with no subsequent message)
    //  then just silently ignore it.
    //  TODO: The connections should be killed instead.
    if (msg_->flags () & msg_t::more) {
        _more_out = true;

        //  Find the pipe associated with the routing id stored in the prefix.
        //  If there's no such pipe just silently ignore the message, unless
        //  router_mandatory is set.
        out_pipe_t *out_pipe = lookup_out_pipe (
          blob_t (static_cast<unsigned char *> (msg_->data ()),
                  msg_->size (), zmq::reference_tag_t ()));

        if (out_pipe) {
            _current_out = out_pipe->pipe;

            // Check whether pipe is closed or not
            if (!_current_out->check_write ()) {
                // Check whether pipe is full or not
                const bool pipe_full = !_current_out->check_hwm ();
                out_pipe->active = false;
                _current_out = NULL;

                if (_mandatory) {
                    _more_out = false;
                    if (pipe_full)
                        errno = EAGAIN;
                    else
                        errno = EHOSTUNREACH;
                    return -1;
                }
            }
        } else if (_mandatory) {
            _more_out = false;
            errno = EHOSTUNREACH;
            return -1;
        }
    }

    int rc = msg_->close ();
    errno_assert (rc == 0);
    rc = msg_->init ();
    errno_assert (rc == 0);
    return 0;
}

//  Ignore the MORE flag for raw-sock or assert?
if (options.raw_socket)
    msg_->reset_flags (msg_t::more);

//  Check whether this is the last part of the message.
_more_out = (msg_->flags () & msg_t::more) != 0;

//  Push the message into the pipe. If there's no out pipe, just drop it.
if (_current_out) {
    // Close the remote connection if user has asked to do so
    // by sending zero length message.
    // Pending messages in the pipe will be dropped (on receiving term- ack)
    if (_raw_socket && msg_->size () == 0) {
        _current_out->terminate (false);
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
        _current_out = NULL;
        return 0;
    }

    const bool ok = _current_out->write (msg_);
    if (unlikely (!ok)) {
        // Message failed to send - we must close it ourselves.
        const int rc = msg_->close ();
        errno_assert (rc == 0);
        // HWM was checked before, so the pipe must be gone. Roll back
        // messages that were piped, for example REP labels.
        _current_out->rollback ();
        _current_out = NULL;
    } else {
        if (!_more_out) {
            _current_out->flush ();
            _current_out = NULL;
        }
    }
} else {
    const int rc = msg_->close ();
    errno_assert (rc == 0);
}

//  Detach the message from the data buffer.
const int rc = msg_->init ();
errno_assert (rc == 0);

return 0;

}