CopernicaMarketingSoftware / AMQP-CPP

C++ library for asynchronous non-blocking communication with RabbitMQ
Apache License 2.0
884 stars 340 forks source link

FRAME_ERROR - type 3, first 16 octets = <<"[,{\"data_body\":{">>: {invalid_frame_end_marker, 34} #530

Closed lushengguo closed 5 months ago

lushengguo commented 5 months ago

RabbitMQ 3.10.5 Erlang 25.0.3 Amqp-cpp/4.3.26

windows platform, I implmented interface of AMQP::ConnectionHandler with boost::asio;

After creat AMQP::Connection and wait for onReady called(using mutex and conditonal_variable), multi-thread program could see AMQP::Connection, then I post every function call or struct construction of AMQP library to boost::asio::io_context thread like code below; IO post to io_context either;

But still get error like title list, I would appriciated it if I could get some advice from here.

SharedPromises AmqpProducer::declare_and_bind(const std::string& exchange, const std::string& queue,
    const std::string& routing_key, AMQP::ExchangeType exchange_type)
{
    SharedPromises promises(new std::vector<std::promise<void>>(3));

    auto task = [=, this]() {
        bool exchange_declared = false;
        bool queue_declared = false;
        bool exchange_queue_bind = false;
        {
            std::lock_guard<std::mutex> lock(declare_and_bind_mutex_);
            exchange_declared = declared_exchanges_.count(exchange) > 0;
            queue_declared = declared_queues_.count(queue) > 0;
            exchange_queue_bind = bindings_[exchange][routing_key].first.count(queue) > 0;
        }

        // If a channel ever sees an error, the entire channel is invalidated, including subsequent instructions that
        // were already sent. This means that if you call multiple methods in a row, and the first method fails, all
        // subsequent methods will not be executed:

        // declare and bind op use different channel will get verbose error message
        if (!exchange.empty() && !exchange_declared) {
            AMQP::Channel channel1(connection_.get());
            channel1.declareExchange(exchange, exchange_type, AMQP::durable)
                .onSuccess([=, this]() {
                    SPDLOG_INFO(
                        "declare exchange:{} of {}:{}{} success", exchange, config_.ip, config_.port, config_.vhost);

                    std::lock_guard<std::mutex> lock(declare_and_bind_mutex_);
                    declared_exchanges_.insert(exchange);
                    promises->at(0).set_value();
                })
                .onError([=, this](const char* error_hint) {
                    SPDLOG_INFO("declare exchange:{} of {}:{}{} error, error_hint:{}", exchange, config_.ip,
                        config_.port, config_.vhost, error_hint);
                    promises->at(0).set_value();
                });
        } else {
            promises->at(0).set_value();
        }

        if (!queue.empty() && !queue_declared) {
            AMQP::Channel channel2(connection_.get());
            channel2.declareQueue(queue)
                .onSuccess([=, this]() {
                    SPDLOG_INFO("declare queue:{} of {}:{}{} success", queue, config_.ip, config_.port, config_.vhost);

                    std::lock_guard<std::mutex> lock(declare_and_bind_mutex_);
                    declared_queues_.insert(queue);
                    promises->at(1).set_value();
                })
                .onError([=, this](const char* error_hint) {
                    SPDLOG_INFO("declare queue:{} of {}:{}{} error, error_hint:{}", queue, config_.ip, config_.port,
                        config_.vhost, error_hint);
                    promises->at(1).set_value();
                });
        } else {
            promises->at(1).set_value();
        }

        if (!queue.empty() && !exchange.empty() && !exchange_queue_bind) {
            AMQP::Channel channel3(connection_.get());
            channel3.bindQueue(exchange, queue, routing_key)
                .onSuccess([=, this]() {
                    SPDLOG_INFO("bind queue:{} to exchange:{} of {}:{}{} success", queue, exchange, config_.ip,
                        config_.port, config_.vhost);

                    std::lock_guard<std::mutex> lock(declare_and_bind_mutex_);
                    bindings_[exchange][routing_key].first.insert(queue);
                    promises->at(2).set_value();
                })
                .onError([=, this](const char* error_hint) {
                    SPDLOG_INFO("bind queue:{} to exchange:{} of {}:{}{} error, error_hint:{}", queue, exchange,
                        config_.ip, config_.port, config_.vhost, error_hint);
                    promises->at(2).set_value();
                });
        } else {
            promises->at(2).set_value();
        }
    };

    boost::asio::post(net::get_io_context(), task);

    return promises;
}

void AmqpProducer::async_publish(
    const std::string& exchange, const std::string& routing_key, const std::string& message)
{
    auto task = [=, this]() {
        AMQP::Channel* channel = nullptr;
        std::lock_guard<std::mutex> lock(declare_and_bind_mutex_);
        if (!bindings_[exchange][routing_key].second)
            bindings_[exchange][routing_key].second = std::make_unique<AMQP::Channel>(connection_.get());

        channel = bindings_[exchange][routing_key].second.get();

        AMQP::Reliable reliable_channel(*channel);
        AMQP::Envelope envelope(message);
        std::string guid = get_guid();
        envelope.setMessageID(guid);
        SPDLOG_INFO("publish message uid:{}", guid);

        reliable_channel.publish(exchange, routing_key, envelope)
            .onAck([this, message, guid]() { SPDLOG_DEBUG("publish success message_body:{}, guid:{}", message, guid); })
            .onLost([this, message, guid]() { SPDLOG_ERROR("publish lost message_body:{}, guid:{}", message, guid); })
            /* onLost means onNack and onError, onError supplies message while onNack didn't, calling onError is for
               verbose log */
            .onError([this, message, guid](const char* error_hint) {
                SPDLOG_ERROR("publish error message_body:{}, guid:{}, error_hint:{}", message, guid, error_hint);
            });
    };

    boost::asio::post(net::get_io_context(), task);
}
EmielBruijntjes commented 5 months ago

This error typically occurs when the AMQP connections is accesses from multiple threads. My guess is that this is the case here too.

However, since your code is incomplete I cannot check this. For now I'll close this issue because it is not reproducable.