chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.81k stars 1.2k forks source link

Partial cancellation of async_write results in wrong return value of bytes_transfered #1263

Open MatthiasVonLaer opened 1 year ago

MatthiasVonLaer commented 1 year ago

Hello,

I'm seeing a problem with the partial cancellation of the async_wirte method.

If I call async_write and cancel it with a partial cancellation, I expect the return value bytesTransfered to refer to the bytes that have been actually transfered into the TCP stream. If not all the data has been written, I expect that I can continue later on to write the remaining data to the stream, resulting in consistent data.

Is this a correct expectation?

On a Windows system I realized that this is not the case if I cancel an async_write operation on a TCP stream that is under back pressure:

It works if I change one of the following:

I've tested with Boost.Asio 1.26.0 and Visual Studio 2022 17.5 (_MSC_VER 1935). See the example code below.

Best regards, Matthias von Laer

#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <iostream>
#include <span>

using namespace boost::asio;

int main()
{
    io_context ioContext;
    cancellation_signal signal;

    const auto writeUntilCancellation = [&ioContext, &signal]() -> awaitable<void>
    {
        ip::tcp::acceptor acceptor(ioContext, ip::tcp::endpoint(ip::tcp::v4(), 1234));
        ip::tcp::socket socket(ioContext);
        co_await acceptor.async_accept(socket, use_awaitable);
        uint32_t counter = 0;
        for (;;)
        {
            std::vector<std::uint32_t> data(1024 * 1024);
            std::ranges::generate(data, [&counter]() { return counter++; });
            const auto [error, bytesTransfered] = co_await async_write(socket, buffer(data), bind_cancellation_slot(
                signal.slot(),
                experimental::as_tuple(use_awaitable)));
            if (error)
            {
                co_await this_coro::reset_cancellation_state();
                const auto remainingData = std::as_bytes(std::span(data)).subspan(bytesTransfered);
                co_await async_write(socket, buffer(std::data(remainingData), std::size(remainingData)), use_awaitable);
                throw boost::system::system_error(error);
            }
        }
    };

    const auto readCreatingBackPressure = [&ioContext]() -> awaitable<void>
    {
        ip::tcp::socket socket(ioContext);
        co_await socket.async_connect(ip::tcp::endpoint(ip::address::from_string("127.0.0.1"), 1234), use_awaitable);
        uint32_t i = 0;
        for (;;)
        {
            std::vector<std::uint32_t> data(1024 * 1024);
            co_await async_read(socket, buffer(data), use_awaitable);
            for (const auto value : data)
            {
                if (value != i++)
                {
                    std::cerr << "mismatch at " << value << ", should be " << i - 1 << ".\n";
                    std::exit(EXIT_FAILURE);
                }
            }
            // throttle to produce back pressure on the TCP stream
            co_await steady_timer(co_await this_coro::executor, std::chrono::milliseconds(200))
                .async_wait(use_awaitable);
        }
    };

    const auto cancelWhenBackpressureIsEstablished = [&signal]() -> awaitable<void>
    {
        // wait for back pressure to establish on the TCP stream
        co_await steady_timer(co_await this_coro::executor, std::chrono::seconds(2)).async_wait(use_awaitable);

        signal.emit(cancellation_type::partial);

        // wait for the program to continue without error
        co_await steady_timer(co_await this_coro::executor, std::chrono::seconds(2)).async_wait(use_awaitable);

        std::cout << "success" << std::endl;
        std::exit(EXIT_SUCCESS);
    };

    co_spawn(ioContext, writeUntilCancellation, detached);
    co_spawn(ioContext, readCreatingBackPressure, detached);
    co_spawn(ioContext, cancelWhenBackpressureIsEstablished, detached);
    ioContext.run();
}
uniqss commented 1 year ago

Oh thank you very much for you cancellation implemention. I changed the code and tested with asio-1.28.0, everything is ok on my ubuntu22.04 even when I changed the buffer to 4MB. But windows is not matching. My code is here https://github.com/uniqss/net_study/blob/main/src/cancellation.cpp , and I made some change to depend on asio only, not depending on boost.