chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.81k stars 1.2k forks source link

Using co simultaneously in a C+20 environment_ await async_ Read and | | async_ Wait will cause serious errors, with test Dome attached #1286

Open MechanicsMingYan opened 1 year ago

MechanicsMingYan commented 1 year ago
  1. Will generate exceptions that cannot be caught
  2. boost::asio::detail::cancellation_handler_base::call[virtual] == 0xFFFFFFFFFFFFFFFF。 /// Emits the signal and causes invocation of the slot's handler, if any. void emit(cancellation_typet type) { if (handler) handler_->call(type); }

ServerDome:

include

include <boost/asio.hpp>

include <boost/asio/ssl.hpp>

include <boost/asio/thread_pool.hpp>

include <boost/asio/experimental/awaitable_operators.hpp>

include <boost/asio/experimental/channel.hpp>

include <boost/asio/experimental/as_tuple.hpp>

include <boost/asio/co_spawn.hpp>

include <boost/asio/detached.hpp>

include <boost/asio/io_context.hpp>

include <boost/asio/ip/tcp.hpp>

include <boost/asio/signal_set.hpp>

include

using boost::asio::ip::tcp; using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::steady_timer; using boost::asio::use_awaitable; using boost::asio::thread_pool; using std::chrono::steady_clock; using boost::asio::as_tuple; using boost::asio::buffer; using boost::asio::experimental::channel; using boost::asio::io_context; using boost::asio::steady_timer;

namespace this_coro = boost::asio::this_coro; using ssl_socket = boost::asio::ssl::stream; constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable); using namespace std::literals::chrono_literals; using namespace boost::asio::experimental::awaitable_operators;

awaitable echo(tcp::socket socket) { steady_timer timer(co_await this_coro::executor); try { char data[2000]; for (;;) { timer.expires_after(2ms); auto result1 = co_await(async_read(socket, buffer(data, 2000), use_nothrow_awaitable) || timer.async_wait(use_nothrow_awaitable)); if (result1.index() == 1) { //std::cout << "time out." << std::endl; } else { auto [e,n] = std::get<0>(result1); if (!e) { if (n) co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable); } else { std::cout << e.message() << std::endl; } }

    }
}
catch (std::exception& e)
{
    std::printf("echo Exception: %s\n", e.what());
}

}

awaitable listener() { auto executor = co_await this_coro::executor; tcp::acceptor acceptor(executor, { tcp::v4(), 5555 }); for (;;) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); co_spawn(executor, echo(std::move(socket)), detached); } }

int main(int argc, char* argv[]) { try { thread_pool pol(8); boost::asio::signal_set signals(pol, SIGINT, SIGTERM); signals.async_wait([&](auto, auto) { pol.stop(); });

    co_spawn(pol, listener(), detached);

    pol.wait();
}
catch (std::exception& e)
{
    std::printf("Exception: %s\n", e.what());
}

}

ClientDome:

include

include <boost/asio.hpp>

include <boost/asio/ssl.hpp>

include <boost/asio/thread_pool.hpp>

include <boost/asio/experimental/awaitable_operators.hpp>

include <boost/asio/experimental/channel.hpp>

include <boost/asio/experimental/as_tuple.hpp>

include <boost/asio/co_spawn.hpp>

include <boost/asio/detached.hpp>

include <boost/asio/io_context.hpp>

include <boost/asio/ip/tcp.hpp>

include <boost/asio/signal_set.hpp>

include <boost/asio/write.hpp>

include <boost/asio/experimental/as_tuple.hpp>

include

using boost::asio::ip::tcp; using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::steady_timer; using boost::asio::use_awaitable; using std::chrono::steady_clock; using boost::asio::thread_pool; using boost::asio::as_tuple; using boost::asio::buffer; using boost::asio::experimental::channel; using boost::asio::io_context;

namespace this_coro = boost::asio::this_coro; constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable); using namespace boost::asio::experimental::awaitable_operators; using namespace std::literals::chrono_literals; thread_pool io_context(3);

awaitable timeout(steady_clock::duration duration) { steady_timer timer(co_await this_coro::executor); timer.expires_after(duration); co_await timer.async_wait(use_nothrow_awaitable); }

awaitable echo3(tcp::socket socket) { try { char data[4024]; for (;;) { co_await timeout(std::chrono::milliseconds(2)); co_await async_write(socket, boost::asio::buffer(data, 2000), use_awaitable); } } catch (const std::exception& e) { std::printf("echo2 Exception: %s\n", e.what()); } }

awaitable listener(tcp::socket socket) { try { auto executor = co_await this_coro::executor;

    auto listen_endpoint =
        *tcp::resolver(socket.get_executor()).resolve("127.0.0.1", std::to_string(5555),
            tcp::resolver::passive).begin();
    co_await socket.async_connect(listen_endpoint, use_awaitable);
    co_spawn(executor, echo3(std::move(socket)), detached);
}
catch (const std::exception& e)
{
    std::printf("connect Exception: %s\n", e.what());
}

}

int main() { try {

    boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
    signals.async_wait([&](auto, auto) { io_context.stop(); });

    for (size_t i = 0; i < 3; i++)
    {
        co_spawn(io_context, listener(std::move(tcp::socket(io_context))), detached);
    }

    io_context.wait();
}
catch (std::exception& e)
{
    std::printf("main Exception: %s\n", e.what());
}

}