chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.94k stars 1.22k forks source link

Cannot forbid "speculative operations" when required for correctness #357

Open dudamoos opened 5 years ago

dudamoos commented 5 years ago

I have an application that reads/writes data to Linux FIFOs (named pipes) that a sibling application (required for data logging) then forwards to/from a hardware bus. The protocol I am implementing requires certain actions to be retried if no valid response is received after a timeout. Additionally, data read from the bus has certain error checking properties (such as parity) that have to be checked - bad data is ignored. If bad data is received, I have to retry the read without resetting the timeout. This introduces a potential race condition in the read handler.

If the handler receives data with bad parity, and both the read handler and timeout expiry handler are queued at the same time, then if the retried read completes immediately such that the read handler must again retry the read because the data had bad parity, then the timeout expiry handler will not be able to cancel the read and the second retried read could be pending forever (which will break the protocol state machine).

The solution to this is to never allow reads (or at least that specific read) to complete immediately (or "speculatively" as it is referred to internally). Since the epoll_reactor always processes timeout handlers after IO completion handlers, disallowing speculative reads would prevent any retried reads from being "uncancellable" by the timeout handler. However, as far as I know, there is no way to forbid speculative reads in Asio (and by extension Boost.Asio, which I am currently using).

This issue is described in more detail in this response on Stack Overflow, though the question answered there is different from my situation. Currently, as far as I know, there is no way to forbid speculative operations in Boost.Asio.

For the moment, I believe I can work around the issue by forcing the use of select_reactor instead of epoll_reactor. However, if speculative operations are ever added to select_reactor in the future, then the race condition would reappear. The best solution would be to have an official API to forbid speculative operations on backends that support them.

dudamoos commented 5 years ago

Of note: I am actually using Boost.Asio, but I filed this issue here because I believe this is the correct place to file issues that are not unique to the Boost version of Asio.

vinniefalco commented 5 years ago

the epoll_reactor always processes timeout handlers after IO completion handlers

Can you please give me more information about this (a link to the related source code lines would be great)?

I have a feeling that your use-case could be addressed by writing your own composed operation. I have done something similar in Beast, to create a stream that supports a timeout. There is a rare case which sounds very much like your scenario, where the timer completion handler is already queued (but not invoked) when the async_read completes. I handle it by inspecting the return value of steady_timer::cancel and taking various action. You can see that here: https://github.com/boostorg/beast/blob/2dcb11b89a73356792cd4dfd5660b907e9b2cb81/include/boost/beast/core/impl/basic_timeout_stream.hpp#L145

I suspect that you might be able to do something similar. Another technique is to make the timer completion invoke the read completion by saving the read completion and "attaching" it to the timer completion. For example, when your read composed operation completes, instead of performing the upcall (invoking the user's completion handler) you "save" the composed operation with something like this: https://github.com/boostorg/beast/blob/2dcb11b89a73356792cd4dfd5660b907e9b2cb81/include/boost/beast/core/saved_handler.hpp#L18

Store the saved_handler on your I/O object as a data member (you have to make your own object, kind of like ssl::stream, where you can store state). When the timer completes, check the saved handler and if it holds a value then you invoke it to perform the upcall.

dudamoos commented 5 years ago

In the epoll_reactor's run method it adds all descriptors ready to perform IO to the op_queue, then adds timer completions to the op_queue at the end if it has reason to check them: https://github.com/chriskohlhoff/asio/blob/4ebd33ce55c59a864b16f6b7f76590cc1aebd56c/asio/include/asio/detail/impl/epoll_reactor.ipp#L446-L565

Looking more into the epoll_reactor implementation (specifically at epoll_reactor::descriptor_state), I also concluded that, since I will only ever have one read operation outstanding at any given time, that read operation is cancellable up until it's handler is invoked (and it actually performs the IO). This isn't true for select_reactor, but that hasn't been an issue so far.

I'm not sure about writing a composed operation. I could probably go review a couple of CppCon videos on the Networking TS to make sure I'm doing it right. I was originally thinking something along the lines of what you suggested: cancelling the timer every time and checking if it was successfully cancelled to determine if I'd timed out but missed having my read cancelled for some reason. I found the Beast HTTP crawler example to illustrate it more simply: https://github.com/boostorg/beast/blob/2dcb11b89a73356792cd4dfd5660b907e9b2cb81/example/http/client/crawl/http_crawl.cpp#L195-L224

I originally didn't want to do that because it would complicate the code and because parts of the protocol have multiple reasons to retry something (some of which are limited before the operation is considered a failure). I can't just close my file descriptor on a timeout because I have no way to re-open it. Admittedly I didn't think about using a composed operation, so the timeout handling would have all been in the business part of the code. I'll look into creating something like your timeout_stream, but wrapping a posix::stream_descriptor instead of a socket. We're on a short schedule, though, so it would have to be something I can make in a day or two.

This project is the first time I've used Boost.Asio, so I may not be entirely familiar with all of it's ups and downs. I was originally considering using epoll/select and timerfds directly (which wouldn't have had this issue because there are no handlers or queues), but I decided to use Boost.Asio because I thought it would help keep the code clean and simple. If I can solve the timeout/read completion race, then there shouldn't be any other difficult race conditions to deal with (and none that wouldn't have happened if I were to use epoll directly).

vinniefalco commented 5 years ago

I can try to answer specific questions, but these two statements are quite in opposition:

We're on a short schedule...

and

This project is the first time I've used Boost.Asio,

dudamoos commented 5 years ago

They are, but it is what it is. I'm also currently stuck with Boost 1.68, so I can't use timeout_stream directly since it was released in 1.69.

I think something like the shared_handler solution may work well. If I can wrap all of the read-with-timeout operations in something that registers itself using a shared handler object, then it can intercept all of the timeout/completion handlers and handle them all correctly (and should more easily scale to a read that can be cancelled by multiple different timeouts). In theory, the shared completion handler would check each of the timers to ensure the read completed prior to expiring, mark itself as having run, and then invoke the true completion handler with the result. If any of the timeouts expire, it would similarly mark itself as having run and then invoke the true completion handler with the operation_aborted error code. As soon as it marks itself as having run, it can cancel all of the timeouts (or the outstanding read) that didn't go first and then ignore any later handler invocations. The main thing I would have to do would be to make it compatible with Asio's "universal asynchronous model" since I stackful coroutines in addition to callbacks. Additionally, each time I attempt a read, I would have to reschedule the timeouts because they should only be truly cancelled if I get valid data (e.g. if I get data with bad parity or that isn't addressed to me I should ignore it and not reset the timeout).

I saw a page in the Beast docs about writing composed operations - I'm assuming that would be a good place to start. I noticed that it uses handler_ptr, though, instead of shared_handler. Is there any functional difference?

vinniefalco commented 5 years ago

Phew, that's a lot to unpack at once. handler_ptr is for when you need stable storage for an operation. Because normally your intermediate handler gets moved with every I/O. shared_handler I don't know what that is.

If timeout_stream looks helpful, you should just copy the sources for it and its dependencies into your project, shouldn't be too bad at all.

dudamoos commented 5 years ago

I might try copying and modifying timeout_stream to suit my needs (namely, supporting multiple timeouts without closing the stream). I meant saved_handler (which you linked to above) rather than shared_handler - typo.

dudamoos commented 5 years ago

I may have found a way to do it using composed operations. It was actually simpler than I thought it would be, though reading through Boost.Beast's timeout_stream implementation helped a lot. I prototyped 2 versions, but ended up using something like the second version where the read handler is explicitly a yield_context. Neither of these use the facilities available in Boost.Beast, so you might be able to improve them. They also capture the timers by reference, which could cause problems if your timers are part of your handler object (which is normally possible - in my experience timers can be moved while there are pending waits). Since timers aren't copyable, I'm not sure how much better it's possible to do, though. It's possible the composed operation could create timers and just be given time_points or durations as arguments, but I didn't want to deal with that just yet.

First version (implemented rather similar to how timeout_stream handles timeouts, but invoking stream.cancel() instead of socket.close()):

// Theoretically this should be safer to run simultaneously with other asynchronous operations
// related to `stream` since `stream.cancel()` is guaranteed to only be called once and even then
// only if the `handler` has not yet run. It's probably still not safe, though.
template <typename AsyncReadStream, typename MutableBufferSequence, typename ReadHandler,
    typename... Timers>
inline typename std::enable_if_t<all_same<Timers...>::value,
    BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler,
        void (boost::system::error_code, std::size_t))>
async_read_multi_timeout(AsyncReadStream& stream,
                         const MutableBufferSequence& buffers,
                         BOOST_ASIO_MOVE_ARG(ReadHandler) handler,
                         Timers&... timers)
{
    // If you get an error on the following line it means that your handler does
    // not meet the documented type requirements for a ReadHandler.
    BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;

    // Required because it's not possible to create an array of references.
    // I also think the variadic parameter interface is nicer to use.
    using array_type = std::array<std::common_type_t<Timers...>*, sizeof...(Timers)>;

    // This takes the completion token and from it creates
    // - completion.completion_handler:
    //    A completion handler (callback) with the specified signature.
    // - completion.result:
    //    An object from which we obtain the result of the initiating function.
    using completion_type = boost::asio::async_completion<ReadHandler,
        void (boost::system::error_code, std::size_t)>;
    using completion_handler_type = typename completion_type::completion_handler_type;
    // These allow easy binding for executors and allocators
    using pick_exec = boost::asio::associated_executor<completion_handler_type,
        typename AsyncReadStream::executor_type>;
    using pick_alloc = boost::asio::associated_allocator<completion_handler_type>;

    struct read_completion_handler {
        AsyncReadStream& stream; // just to get the executor
        std::shared_ptr<bool> completed;
        // If N is large this could be expensive to include inline.
        const array_type timers;
        completion_handler_type handler;

        // completion handler for the async_read operation
        void operator()(boost::system::error_code ec, std::size_t bytes_transferred) {
            std::size_t n = 0;
            for (boost::asio::steady_timer& t : timers) n += t->cancel();

            if (!ec) {
                if (*completed) { // timeout handler already invoked
                    BOOST_ASSERT(n < timers.size());
                    ec = boost::asio::error::operation_aborted; // need a proper timeout error
                } else if (n < timers.size()) {
                    ec = boost::asio::error::operation_aborted; // need a proper timeout error
                } else {
                    BOOST_ASSERT(n == timers.size());
                }
            }

            *completed = true;
            handler(ec, bytes_transferred);
        }

        // Composed operations must preserve the executor of the user-supplied
        // completion handler for correctness.
        using executor_type = typename pick_exec::type;
        // Get the executor associated with the completion handler, or, if there is
        // none, the stream's I/O executor.
        executor_type get_executor() const noexcept {
            return pick_exec::get(handler, stream.get_executor());
        }

        // Preserving the allocator of the user-supplied completion handler isn't
        // required, but you should anyways.
        using allocator_type = typename pick_alloc::type;
        // Get the allocator associated with the completion handler, or, if there is
        // none, the default allocator.
        allocator_type get_allocator() const noexcept {
            return pick_alloc::get(handler);
        }
    };

    completion_type completion(handler);
    std::shared_ptr<bool> completed = std::allocate_shared(
        pick_alloc::get(completion.completion_handler),
        false);

    for (auto t : array_type{&timers...})
        t->async_wait(boost::asio::bind_executor(
            pick_exec::get(completion.completion_handler, stream.get_executor()),
            [completed, &stream](const boost::system::error_code& ec) mutable {
                if (ec == boost::asio::error::operation_aborted) return;
                BOOST_ASSERT(!ec);
                if (!*completed) {
                    stream.cancel();
                    *completed = true;
                }
            }));

    boost::asio::async_read(stream, buffers, read_completion_handler{
        stream, completed, {&timers...}, std::move(completion.completion_handler)});
    return completion.result.get();
}

Second version (more like what I ended up using - I take advantage of the fact that there will never be more than 1 outstanding read operation at a time):

// This version can't be run simultaneously with any other asynchronous operations on stream, but in
// exchange it doesn't need to allocate any memory.
template <typename AsyncReadStream, typename MutableBufferSequence, typename ReadHandler,
    typename... Timers>
inline typename std::enable_if_t<all_same<Timers...>::value,
    BOOST_ASIO_INITFN_RESULT_TYPE(ReadHandler, void (boost::system::error_code, std::size_t))>
async_read_multi_timeout_2(AsyncReadStream& stream,
                           const MutableBufferSequence& buffers,
                           BOOST_ASIO_MOVE_ARG(ReadHandler) handler,
                           Timers&... timers)
{
    // If you get an error on the following line it means that your handler does
    // not meet the documented type requirements for a ReadHandler.
    BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;

    // Required because it's not possible to create an array of references.
    // I also think the variadic parameter interface is nicer to use.
    using array_type = std::array<std::common_type_t<Timers...>*, sizeof...(Timers)>;

    // This takes the completion token and from it creates
    // - completion.completion_handler:
    //    A completion handler (callback) with the specified signature.
    // - completion.result:
    //    An object from which we obtain the result of the initiating function.
    using completion_type = boost::asio::async_completion<ReadHandler,
        void (boost::system::error_code, std::size_t)>;
    using completion_handler_type = typename completion_type::completion_handler_type;
    // These allow easy binding for executors and allocators
    using pick_exec = boost::asio::associated_executor<completion_handler_type,
        typename AsyncReadStream::executor_type>;
    using pick_alloc = boost::asio::associated_allocator<completion_handler_type>;

    struct read_completion_handler {
        AsyncReadStream& stream; // just to get the executor
        boost::system::error_code error;
        std::size_t bytes_transferred;
        // If N is large this could be expensive to include inline, but that's unlikely.
        const array_type timers;
        completion_handler_type handler;

        // completion handler for the async_read operation (phase 1)
        void operator()(boost::system::error_code ec, std::size_t n) {
            // Try cancelling all timers.
            std::size_t n_cancelled = 0;
            for (auto t : timers) n_cancelled += t->cancel();

            bytes_transferred = n;

            // If the read did not complete successfully, then store the error code.
            if (ec) error = ec;
            // If a timeout expired simultaneously to read completion, then report cancellation.
            else if (n_cancelled < timers.size()) error = boost::asio::error::operation_aborted;

            // Give any expired/cancelled timers a chance to run their handlers.
            boost::asio::post(std::move(*this));
        }

        // handler invoked by boost::asio::post() (phase 2)
        void operator()() {
            // At this point all timer completion handlers (from expiry or cancellation)
            // should have run.
            handler(error, bytes_transferred);
        }

        // Composed operations must preserve the executor of the user-supplied
        // completion handler for correctness.
        using executor_type = typename pick_exec::type;
        // Get the executor associated with the completion handler, or, if there is
        // none, the stream's I/O executor.
        executor_type get_executor() const noexcept {
            return pick_exec::get(handler, stream.get_executor());
        }

        // Preserving the allocator of the user-supplied completion handler isn't
        // required, but you should anyways.
        using allocator_type = typename pick_alloc::type;
        // Get the allocator associated with the completion handler, or, if there is
        // none, the default allocator.
        allocator_type get_allocator() const noexcept {
            return pick_alloc::get(handler);
        }
    };

    completion_type completion(handler);

    for (auto t : array_type{&timers...})
        t->async_wait(boost::asio::bind_executor(
            pick_exec::get(completion.completion_handler, stream.get_executor()),
            [&stream](const boost::system::error_code& ec) mutable {
                if (ec == boost::asio::error::operation_aborted) return;
                stream.cancel();
            }));

    boost::asio::async_read(stream, buffers, read_completion_handler{ stream,
        boost::system::error_code(), 0, {&timers...}, std::move(completion.completion_handler)});
    return completion.result.get();
}
dudamoos commented 5 years ago

This issue hasn't been responded to in a while, so I'll close my involvement with this:

I do believe that this issue should be addressed if the implementation is reasonable. As shown in the Stack Overflow question linked to in the opening post, there are cases where Asio doesn't handle speculative operations correctly where it would otherwise work fine (such as on device files). That's why I didn't close this issue. However, as discussed above, I have a different (and arguably better) solution to my particular problem, so I no longer need this fix.

I'll leave it up to Chris whether or not to keep this issue open.

vinniefalco commented 5 years ago

It has only been 24 days since you opened the issue! That's practically no time at all...

dudamoos commented 5 years ago

I didn't necessarily expect it to be fixed quickly if Chris was interested in addressing it. I just haven't seen him express interest in addressing it right now.