chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.81k stars 1.2k forks source link

How can you use a custom reactor implementation with boost:asio? #1264

Open blackbox-tech opened 1 year ago

blackbox-tech commented 1 year ago

I'm trying to integrate boost::asio with a third-party event_loop that will manage all my fds and timers etc in a manner that is similar to epoll. I've looked at the documentation as well as the existing reactors/services in boost such as the epoll_reactor and the io_uring_service and it's not obvious to me how to tell boost::asio to use a different reactor/service at run-time (ie other than the default ones that are typedef'd at compile time in reactor.hpp). I certainly have all the functionality I need for a reactor implementation with this third-party event-loop, I just can't figure out how to plug it in and use it with boost::asio.

I see that is possible to create a custom service, using boost::asio::use_service(), but I can't find any example that implements a custom reactor, or uses any kind of alternative event-loop implementation. It also appears that io_service is deprecated in favour or io_context now? Is it possible to point me to an example of how to integrate your own reactor service into boost::asio, or if this is not possible in the current design, can you suggest a clean way for me to include the io_context so that has a different typedef for it's reactor type?

Many thanks.

vinipsmaker commented 1 year ago

Asio is a huge framework. For the concern at hand, we may see it as a collection of the following components:

So we can mix and match many components in Boost.Asio. We can use some Boost.Asio algorithms to work on top of our own execution contexts, but we cannot customize asio::io_context to use different reactors (or proactors).

However you can use asio::io_context within other reactors. Just call asio::io_context::poll() (instead of asio::io_context::run()) periodically within your reactor loop (e.g. at every event loop tick).

Alternatively, you may integrate the foreign event loop within Boost.Asio. For instance, you can do this for GTK+'s GLib. It seems to be possible to do this with Qt as well (there are even examples around), but I haven't tried with Qt myself, so I can't comment much.

blackbox-tech commented 1 year ago

@vinipsmaker Thank you for your suggestions, they were extremely helpful. These are my current thoughts regarding possible solutions on linux...

As I understand it there are 2 possible approaches to integrate asio with a 3rd party event loop within the same thread.

1) You can run the 3rd party event-loop using asio's event-loop if it exposes an fd to you. In the GLib example asio is adding Glib's epoll_fd inside the fd set of its own epoll_fd. (Unfortunatley in my case, the 3rd party event loop is actually python's internal asyncio event loop so its not possible to extract the epoll_fd that it uses internally)

2) You can dispatch asio using the 3rd party event loop instead calling asio::io_context::run(). I can see 2 ways to do this: a) As you suggest I could call asio::io_context::poll() repeatedly from the 3rd party event loop. This would turn the thread running the event_loop into a busy wait, ie spinning the CPU instead of being idle which is not ideal in my use case. b) To avoid spinning you could use the 3rd party event loop to poll asio's epoll_fd_, calling asio::io_context::poll() whenever it becomes readable. (Effectively do the reverse of the Glib example in the 1st solution).

I think that 2b is the cleanest solution for my use case because I don't want to spin but I do care about latency (enough to avoid any delay by calling asio::io_context::poll() on a timer).

My solution might look somewhat abusive as asio does not expose the epoll_fd that it uses via its interface, but I have managed to safely extract the epoll_fd_ from the reactor implementation detail using Dave Abraham's stowed pointer template trick.

template<class Tag> struct stowed { static typename Tag::type value; };
template<class Tag> typename Tag::type stowed<Tag>::value;
template<class Tag, typename Tag::type x> struct stow_private { stow_private() { stowed<Tag>::value = x; } static stow_private instance; };
template<class Tag, typename Tag::type x> stow_private<Tag, x> stow_private<Tag, x>::instance;

struct EpollFD { typedef int(boost::asio::detail::epoll_reactor::*type); };
template class stow_private<EpollFD, &boost::asio::detail::epoll_reactor::epoll_fd_>;

// A utility function to get the epoll_fd from an io_context (linux specific)
int get_epoll_fd(boost::asio::io_context& io_ctx) {
    auto& reactor = boost::asio::use_service<boost::asio::detail::epoll_reactor>(io_ctx);
    return reactor.*stowed<EpollFD>::value;
}
vinipsmaker commented 1 year ago

Unfortunatley in my case, the 3rd party event loop is actually python's internal asyncio event loop so its not possible to extract the epoll_fd that it uses internally

Well, @philoinovsky worked as a student for GSoC 2021 to integrate Boost.Asio and Python's asyncio. You may find his old work at https://github.com/BoostGSoC21/python.

The work also involved research. And with every research comes risk. Unfortunately we found (as part of the research phase) that Python made it difficult to develop a general solution.

If you're constrained to a single thread, you can develop a solution that will work for you. If you want some library maintained in Boost (or close enough), you'll have to wait until Python stops sharing the same GIL among all Python subinterpreters (why do separate VMs that share no state are sharing the GIL to begin with??). That way, it'd be possible to safely pass thread control back to the Python VM w/o the fear of blocking all threads of the application and risk a possible deadlock. It'd be possible to have each Python subinterpreter constrained to its own Boost.Asio strand (the possibilities would be very interesting). That is something that I'd like to use myself as well. Unfortunately Python subinterpreters right now are far from convincing.

My solution might look somewhat abusive as asio does not expose the epoll_fd that it uses via its interface, but I have managed to safely extract the epollfd from the reactor implementation detail using Dave Abraham's stowed pointer template trick.

That's hacky, but it works. Just be mindful of the limitations:

Given you're a Python guy yourself, I'd like to ask you to ping me when/if Python developers change the subinterpreters implementation to stop using the GIL. I no longer follow Python development, so I cannot know. However if they do implement this change, I could continue the work @philoinovsky started.

blackbox-tech commented 1 year ago

@vinipsmaker The python event loop is just one of a few of different event loops that I need to integrate with, (but that's the one that does not expose a fd). The reason for python is that my C++ library has some async python bindings, so I need to callback into the user's interpreter, I can't create my own sub-interpreters unfortunately. I don't follow python development closely either, but personally I suspect the GIL is here to stay for a while, as the paradigm of using a GIL is fairly hardcoded into the design of many python extensions, not just the built-in modules.

The approach that I will try to take in my library is to provide a generic function that can bind an asio::io_context so that it can be run by any other event_loop that supports polling an fd. The nice thing about using the stowed pointer trick to get asio's epoll_fd is that it will fail at compile time if/when the boost implementation detail changes. My application only needs to support linux (hence epoll), but the same approach could be applied to other demultiplexors that use a fd such as kqueue. I'm not really familiar with the new io_uring interface or window's IO completion, so I don't know what would and would not be possible there to be honest.

blackbox-tech commented 1 year ago

@vinipsmaker If you're interested, I think I finally have something that works. I discovered that calling run() is not the same as calling poll(), specifically run() clears the interruptor so that when epoll_wait() is running with a timeout of -1, it can be woken up by other threads.

What I ended up implementing was something along these lines.


#include <boost/asio/detail/epoll_reactor.hpp>
#include <boost/asio/detail/scheduler.hpp>

template<class Tag> struct stowed { static typename Tag::type value; };
template<class Tag> typename Tag::type stowed<Tag>::value;
template<class Tag, typename Tag::type x> struct stow_private { stow_private() { stowed<Tag>::value = x; } static stow_private instance; };
template<class Tag, typename Tag::type x> stow_private<Tag, x> stow_private<Tag, x>::instance;

struct SchedulerMutex { typedef boost::asio::detail::conditionally_enabled_mutex(boost::asio::detail::scheduler::*type); };
template class stow_private<SchedulerMutex, &boost::asio::detail::scheduler::mutex_>;

struct TaskInterrupted { typedef bool(boost::asio::detail::scheduler::*type); };
template class stow_private<TaskInterrupted, &boost::asio::detail::scheduler::task_interrupted_>;

struct EpollFD { typedef int(boost::asio::detail::epoll_reactor::*type); };
template class stow_private<EpollFD, &boost::asio::detail::epoll_reactor::epoll_fd_>;

int get_epoll_fd(boost::asio::io_context& io_ctx) {
    auto& reactor = boost::asio::use_service<boost::asio::detail::epoll_reactor>(io_ctx);
    return reactor.*stowed<EpollFD>::value;
}

void clear_interruptor(boost::asio::io_context& io_ctx) {
    auto& scheduler = boost::asio::use_service<boost::asio::detail::scheduler>(io_ctx);
    auto& mutex = scheduler.*stowed<SchedulerMutex>::value;
    boost::asio::detail::conditionally_enabled_mutex::scoped_lock lock(mutex);
    scheduler.*stowed<TaskInterrupted>::value = false;
}

void bind(boost::asio::io_context& io_ctx, CustomEventLoop& event_loop) {
    event_loop.poll(get_epoll_fd(io_ctx), [&](){
        std::size_t count;
        io_ctx.restart();
        do {
            count = io_ctx.poll();
        } while (count > 0);
        clear_interruptor(io_ctx);  // allow asio's event_fd to interrupt epoll_wait forever so other threads can post to asio
    });
}