CopernicaMarketingSoftware / AMQP-CPP

C++ library for asynchronous non-blocking communication with RabbitMQ
Apache License 2.0
864 stars 334 forks source link

impl my own IO handler #531

Closed lushengguo closed 1 month ago

lushengguo commented 1 month ago

Here is a demo from linux platform with the newest code of AMQP-CPP;

I just simplified the code for a clear review, rabbitmq deployed on same linux machine, exchange/queue already declared.

In my code, I posted every opeartion of IO, creat channel/reliable and call method of it to boost::asio::io_context, and it's obvious that all amqp operation as it run in single thread;

This demo impls read and write, it could correctly handshake with rabbitmq and declare exchange/queue and bind them in one thread, also publish message in main thread, when I tried to publish message from different thread, error occurs, what confused me is that "LibBoostAsioHandler and AMQP::TcpConnection" works well in this testcase, so posting task from another thread to io_context is ok.

Readme.md said that I should impl IO by myself, I saw TcpConnection maintained some state of amqp connection, should I maintain those inner state either to ensure some IO sequence?

by the way, I'm using windows, so I have to do IO myself, just running test code on linux to make sure it's not platform's fault.

#include "spdlog/spdlog.h"
#include <amqpcpp.h>
#include <amqpcpp/libboostasio.h>
#include <atomic>
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <thread>

boost::asio::io_context io_context(4);

class AmqpIOHandler : public AMQP::ConnectionHandler {
private:
  std::string host_;
  int port_;
  boost::asio::ip::tcp::socket socket_;
  AMQP::Connection *connection_;

public:
  AmqpIOHandler(const std::string &host, int port, );
  ~AmqpIOHandler() final = default;

  void onData(AMQP::Connection *connection, const char *data,
              size_t size) override;
  void onReady(AMQP::Connection *connection) override {}
  void onError(AMQP::Connection *connection, const char *message) override {}
  void onClosed(AMQP::Connection *connection) override {}
  uint16_t onNegotiate(AMQP::Connection *connection,
                       uint16_t interval) override {
    return interval;
  }

private:
  boost::asio::awaitable<void> readData();
};

AmqpIOHandler::AmqpIOHandler(const std::string &host, int port, )
    : host_(host), port_(port), socket_(io_context), connection_(nullptr),
      heartbeat_interval_(0), heartbeat_timer_(io_context),
      reconnect_callback_(reconnect_callback) {
  try {
    boost::asio::ip::tcp::resolver resolver(io_context);
    boost::asio::connect(socket_,
                         resolver.resolve({host, std::to_string(port)}));
    boost::asio::post(io_context, [=, this]() mutable {
      boost::asio::co_spawn(io_context, readData(), boost::asio::detached);
    });
  } catch (const boost::system::system_error &e) {
    // LOG ERROR AND HANDLE
  }
}

boost::asio::awaitable<void> AmqpIOHandler::readData() {
  try {
    for (;;) {
      boost::system::error_code ec;
      std::vector<char> read_data(1024);
      size_t length = co_await socket_.async_read_some(
          boost::asio::buffer(read_data), boost::asio::use_awaitable);

      if (ec) {
        // LOG ERROR AND HANDLE
      }

      connection_->parse(read_data.data(), length);
    }
  } catch (std::exception &e) {
    // LOG ERROR AND HANDLE
  }
}

uint16_t AmqpIOHandler::onNegotiate(AMQP::Connection *connection,
                                    uint16_t interval) {
  return interval;
}

void AmqpIOHandler::onData(AMQP::Connection *connection, const char *data,
                           size_t size) {
  connection_ = connection;
  std::vector<char> d(data, data + size);

  auto send_task = [this, d = std::move(d)]() -> boost::asio::awaitable<void> {
    boost::system::error_code ec;
    size_t wrote_len = co_await boost::asio::async_write(
        socket_, boost::asio::buffer(d),
        boost::asio::redirect_error(boost::asio::use_awaitable, ec));
    if (wrote_len != d.size()) {
      // LOG ERROR AND HANDLE
    }

    if (ec) {
      // LOG ERROR AND HANDLE
    }
  };

  boost::asio::post(io_context, [this, task = std::move(send_task)]() mutable {
    co_spawn(io_context, task, boost::asio::detached);
  });
}

int main() {
  std::jthread run([&]() {
    boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
    signals.async_wait([&](auto, auto) { io_context.stop(); });
    io_context.run();
  });

  sleep(1);

  AmqpIOHandler handler("localhost", 5672);
  AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

  sleep(1);

  AMQP::Channel channel(&connection);
  AMQP::Reliable reliable(channel);

  for (int i = 0; i < 10000; i++) {
    boost::asio::post(io_context, [&]() {
      reliable.publish("exchange", "", "this is a test message")
          .onAck([=]() {})
          .onLost([=]() { /* LOG ERROR AND HANDLE */ });
    });
  }

  std::jthread run2([&]() {
    for (int i = 0; i < 10000; i++) {
      boost::asio::post(io_context, [&]() {
        reliable.publish("exchange", "", "this is a test message")
            .onAck([=]() {})
            .onLost([=]() { /* LOG ERROR AND HANDLE */ });
      });
    }
  });

  for (;;)
    ;
}
EmielBruijntjes commented 1 month ago

Your example uses three threads (main, and two std::jthread instances), and all these threads somehow interact directly or indirectly with the AMQP::Connection instance. This is not good. This will result in undefined behavior as the order in which data is sent over the socket is out of control. Solution: do not use threads.