chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.94k stars 1.22k forks source link

Bug report : A potential bug will be triggered when IOCP is out of resources. #312

Open mike07026 opened 6 years ago

mike07026 commented 6 years ago

In short, there are flaws in the mechanism of handling the case IOCP is out of resources, in some extreme case io_context(io_service) will dispatch the op with wrong results.

Code version: Asio in Boost 1.67.0 Platform : Windows with IOCP

Details: In asio\detail\impl\win_iocp_io_context.ipp, completed_ops_ is a spare queue for handling the case that IOCP is out of resources. When IOCP is out of resources, op will be enqueued into completedops, but also lose the key information "overlapped_contains_result".

void win_iocp_io_context::on_completion(win_iocp_operation* op,
    const boost::system::error_code& ec, DWORD bytes_transferred)
{
  // Flag that the operation is ready for invocation.
  op->ready_ = 1;

  // Store results in the OVERLAPPED structure.
  op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
  op->Offset = ec.value();
  op->OffsetHigh = bytes_transferred;

  // Enqueue the operation on the I/O completion port.
  if (!::PostQueuedCompletionStatus(iocp_.handle,
        0, overlapped_contains_result, op))
  {
    // Out of resources. Put on completed queue instead.
    mutex::scoped_lock lock(dispatch_mutex_);
    completed_ops_.push(op);
    ::InterlockedExchange(&dispatch_required_, 1);
  }
}

After that, the main loop of size_t win_iocp_io_context::do_one(DWORD msec, boost::system::error_code& ec) will call post_deferred_completions to pop op from completedops and try to enqueue it into IOCP again. But remember that the op's key information "overlapped_contains_result" has lost, the follow code does not know that the op contains results, so the op will be treated as "no result op" and dispatch with a new wrong result.

size_t win_iocp_io_context::do_one(DWORD msec, boost::system::error_code& ec)
{
  for (;;)
  {
    // Try to acquire responsibility for dispatching timers and completed ops.
    if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
    {
      mutex::scoped_lock lock(dispatch_mutex_);

      // Dispatch pending timers and operations.
      op_queue<win_iocp_operation> ops;
      ops.push(completed_ops_);
      timer_queues_.get_ready_timers(ops);
      post_deferred_completions(ops);
      update_timeout();
    }

    // Get the next operation from the queue.
    DWORD bytes_transferred = 0;
    dword_ptr_t completion_key = 0;
    LPOVERLAPPED overlapped = 0;
    ::SetLastError(0);
    BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
        &bytes_transferred, &completion_key, &overlapped,
        msec < gqcs_timeout_ ? msec : gqcs_timeout_);
    DWORD last_error = ::GetLastError();

    if (overlapped)
    {
      win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
      boost::system::error_code result_ec(last_error,
          boost::asio::error::get_system_category());

      // We may have been passed the last_error and bytes_transferred in the
      // OVERLAPPED structure itself.
      if (completion_key == overlapped_contains_result)
      {
        result_ec = boost::system::error_code(static_cast<int>(op->Offset),
            *reinterpret_cast<boost::system::error_category*>(op->Internal));
        bytes_transferred = op->OffsetHigh;
      }

      // Otherwise ensure any result has been saved into the OVERLAPPED
      // structure.
      else
      {
        op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
        op->Offset = result_ec.value();
        op->OffsetHigh = bytes_transferred;
      }

      // Dispatch the operation only if ready. The operation may not be ready
      // if the initiating function (e.g. a call to WSARecv) has not yet
      // returned. This is because the initiating function still wants access
      // to the operation's OVERLAPPED structure.
      if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
      {
        // Ensure the count of outstanding work is decremented on block exit.
        work_finished_on_block_exit on_exit = { this };
        (void)on_exit;

        op->complete(this, result_ec, bytes_transferred);
        ec = boost::system::error_code();
        return 1;
      }
    }
    else if (!ok)
    {
      if (last_error != WAIT_TIMEOUT)
      {
        ec = boost::system::error_code(last_error,
            boost::asio::error::get_system_category());
        return 0;
      }

      // If we're waiting indefinitely we need to keep going until we get a
      // real handler.
      if (msec == INFINITE)
        continue;

      ec = boost::system::error_code();
      return 0;
    }
    else if (completion_key == wake_for_dispatch)
    {
      // We have been woken up to try to acquire responsibility for dispatching
      // timers and completed operations.
    }
    else
    {
      // Indicate that there is no longer an in-flight stop event.
      ::InterlockedExchange(&stop_event_posted_, 0);

      // The stopped_ flag is always checked to ensure that any leftover
      // stop events from a previous run invocation are ignored.
      if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
      {
        // Wake up next thread that is blocked on GetQueuedCompletionStatus.
        if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
        {
          if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
          {
            last_error = ::GetLastError();
            ec = boost::system::error_code(last_error,
                boost::asio::error::get_system_category());
            return 0;
          }
        }

        ec = boost::system::error_code();
        return 0;
      }
    }
  }
}

A simple solution: I think adding a variable into op context to record op's completion key can fix this bug.

Can you please have a look ? Thank you in advance.@chriskohlhoff

ljluestc commented 1 week ago

#include <boost/asio.hpp>
#include <iostream>
#include <map>
#include <mutex>
#include <condition_variable>
#include <atomic>

class win_iocp_operation {
public:
    // Add a member to store the completion key
    DWORD completion_key;  // Store the completion key for the operation

    // Other members remain unchanged...
    std::atomic<int> ready_ = 0;
    ULONG_PTR Internal;
    DWORD Offset;
    DWORD OffsetHigh;

    // Constructor and other member functions...
};

class win_iocp_io_context {
public:
    // Existing methods...

    void on_completion(win_iocp_operation* op,
        const boost::system::error_code& ec, DWORD bytes_transferred) {

        // Flag that the operation is ready for invocation.
        op->ready_ = 1;

        // Store results in the OVERLAPPED structure.
        op->Internal = reinterpret_cast<ULONG_PTR>(&ec.category());
        op->Offset = ec.value();
        op->OffsetHigh = bytes_transferred;

        // Attempt to post to the completion port
        if (!::PostQueuedCompletionStatus(iocp_.handle,
            0, op->completion_key, op)) {
            // Out of resources, put on completed queue instead
            std::lock_guard<std::mutex> lock(dispatch_mutex_);
            completed_ops_.push(op);
            ::InterlockedExchange(&dispatch_required_, 1);
        }
    }

    size_t do_one(DWORD msec, boost::system::error_code& ec) {
        for (;;) {
            // Try to acquire responsibility for dispatching timers and completed ops.
            if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1) {
                std::lock_guard<std::mutex> lock(dispatch_mutex_);

                // Dispatch pending timers and operations.
                op_queue<win_iocp_operation> ops;
                ops.push(completed_ops_);
                timer_queues_.get_ready_timers(ops);
                post_deferred_completions(ops);
                update_timeout();
            }

            // Get the next operation from the queue.
            DWORD bytes_transferred = 0;
            LPOVERLAPPED overlapped = nullptr;
            ::SetLastError(0);
            BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
                &bytes_transferred, &completion_key, &overlapped,
                msec < gqcs_timeout_ ? msec : gqcs_timeout_);
            DWORD last_error = ::GetLastError();

            if (overlapped) {
                win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
                boost::system::error_code result_ec(last_error,
                    boost::asio::error::get_system_category());

                // We may have been passed the last_error and bytes_transferred in the OVERLAPPED structure itself.
                if (op->completion_key == overlapped_contains_result) {
                    result_ec = boost::system::error_code(static_cast<int>(op->Offset),
                        *reinterpret_cast<boost::system::error_category*>(op->Internal));
                    bytes_transferred = op->OffsetHigh;
                }

                // Otherwise ensure any result has been saved into the OVERLAPPED structure.
                else {
                    op->Internal = reinterpret_cast<ULONG_PTR>(&result_ec.category());
                    op->Offset = result_ec.value();
                    op->OffsetHigh = bytes_transferred;
                }

                // Dispatch the operation only if ready.
                if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) {
                    // Ensure the count of outstanding work is decremented on block exit.
                    work_finished_on_block_exit on_exit = { this };
                    (void)on_exit;

                    op->complete(this, result_ec, bytes_transferred);
                    ec = boost::system::error_code();
                    return 1;
                }
            }
            // Handle other cases...
        }
    }

private:
    // Members for IOCP handling...
    std::mutex dispatch_mutex_;
    // Other members remain unchanged...
};