cplusplus / sender-receiver

Issues list for P2300
Apache License 2.0
20 stars 4 forks source link

integration of `run_loop` with other event loops #172

Open ericniebler opened 11 months ago

ericniebler commented 11 months ago

The run member function of run_loop consumes the current thread until the run loop finishes. There should be a way to tell the run_loop to delegate execution to a separate event loop, like a system event loop.

I can imagine users calling run_loop::run with an optional callable, which run will call periodically; maybe after every item that run processes.

void run_loop::run( callable auto fn ) {
  while( auto item = pop_front() ) {
    item.execute();
    if( !fn( [this]() -> bool { return !this->_is_empty(); } ) )
      break;
    }
  }
}

run invokes the callable with a predicate that indicates whether the run loop's queue is empty or not. We can use this to drive two run_loops on the same thread:

run_loop loop1, loop2;

loop1.run(
  [&](auto has_work) {
    // Run loop2 while loop1 is empty
    loop2.run( [&](auto) { return has_work(); } );
  }
);
kirkshoop commented 11 months ago

This looks nice for the case when the run_loop owns the thread.

A different interface is needed to dispatch run_loop on a thread owned by a different framework (UI etc..)

I have used

Sender has_work() And Sender run_some()

has_work returns a sender that will 1. complete when work is ready/work was added or 2. Completes inline with the time_point of the next time-scheduled item. This allows the run loop driver to wait if it owns the thread or set a 'timer' in the framework that does own the thread.

run_some returns a sender that will dispatch some items inline to start(). It uses a stop_token from the receiver to break out of the inline dispatch loop.

raphaelgz commented 11 months ago

If I may put my two cents in, other possibility is to offer an interface similar to that of the Asio's io_context, something like:

return false;

- `void run_loop::run()`: equivalent to:
```c++
while (run_one())
  ;

That way is possible to compose multiple event loops instead of delegating the execution through run_loop:

QEventLoop qtLoop;
std::execiton::run_loop loop;
int fd = epoll_create1(EPOLL_CLOEXEC);
::epoll_event ev;
bool stop = false;

while (!stop) {
    qtLoop.processEvents(QEventLoop::AllEvents, 1000);
    epoll_wait(fd, &ev, 1, 1000);
    loop.run_for(std::chrono::seconds(1)); // or loop.poll();
}
BenFrantzDale commented 2 months ago

This may not be the direction you want to take this item, but coming from tbb, I'm used to "blocking" operations permitting the calling thread to be involved. Fundamentally this pattern is what makes tbb composable. My understanding is that blocking operations have the calling thread queue up work to in its own thread-local queue, and then start consuming that work while other workers wake up and can also start stealing work from the queue. As a result, while you can wind up with only one thread doing work, you don't wind up deadlocking.

Here's an example of tbb not deadlocking where sync_wait does: https://godbolt.org/z/P9sqvYrT9

Obviously coroutines are a way around this (with only one sync_wait in the whole program), but that's not realistic in lots of cases.

I feel like a stdexec would do well with a flavor of sync_wait that also takes a scheduler where internally it combines the passed-in scheduler with run_loop to make a work-stealing scheduler. This is a bit of a handwave, but I'm picturing internally it would be like

auto sync_wait_with_scheduler(scheduler sch, Sender&& sndr) {
    state_t state {};
    run_loop loop;
>   // A scheduler that will queue up work for `loop` but that `sch` can steal from:
>   stealing_scheduler combined{loop.get_scheduler(), sch};

    // Launch the sender with a continuation that will fill in a variant
    // and notify a condition variable.
    auto op_state =
>       connect((Sender&&) sndr, __receiver_t<Sender>{&state, &combined});
    start(op_state);

    // Wait for the variant to be filled in.
>   // Even if this thread the one and only thread of `sch`, we should make forward progress.
>   combined.run();
    ...

There would still be footguns, such as when the sender explicitly transfers work onto sch when the caller is already part of sch, but I think something like this would allow at least simple uses of sync_wait to not worry about deadlocking. Thoughts?

LeeHowes commented 2 months ago

This is the purpose of forward progress delegation. Let's say you sync_wait on some task running on scheduler S, S can query the delegation scheduler from the receiver and get sync_wait's internal scheduler. S can then push work onto it (or with appropriate specialization internally do something more intelligent).

BenFrantzDale commented 2 months ago

This is the purpose of forward progress delegation. Let's say you sync_wait on some task running on scheduler S, S can query the delegation scheduler from the receiver and get sync_wait's internal scheduler. S can then push work onto it (or with appropriate specialization internally do something more intelligent).

Are you saying sync_wait should be using the get_delegation_scheduler machinery to prevent deadlock? It must not be since my godbolt link easily deadlocks.

LeeHowes commented 2 months ago

No, the scheduler that runs the work you sync_wait on needs to do it. The underlying scheduler (say TBB) needs to know how to delegate work to the calling thread, because that's where the expertise is.

Right now none of this is implemented in run_loop. I hope that a system scheduler or TBB scheduler will do it. All I'm saying is that the fundamental hooks are in P2300, we just have to use them, which is QOI.

BenFrantzDale commented 2 months ago

No, the scheduler that runs the work you sync_wait on needs to do it. The underlying scheduler (say TBB) needs to know how to delegate work to the calling thread, because that's where the expertise is.

Right now none of this is implemented in run_loop. I hope that a system scheduler or TBB scheduler will do it. All I'm saying is that the fundamental hooks are in P2300, we just have to use them, which is QOI.

I see. I'll have a look at that get_delegation_scheduler functionality. Thanks.

lewissbaker commented 2 months ago

Not withstanding the get_delegation_scheduler functionality, I do think that we could do with a sync_wait() overload that takes a driveable_context and that drives that context (and propagates it as the delegation scheduler) instead of creating a new run_loop.

I don't think we have the right API design for run_loop to support that at the moment, though.

BenFrantzDale commented 1 month ago

Not withstanding the get_delegation_scheduler functionality, I do think that we could do with a sync_wait() overload that takes a driveable_context and that drives that context (and propagates it as the delegation scheduler) instead of creating a new run_loop.

I don't think we have the right API design for run_loop to support that at the moment, though.

OK, I think (?) I understand roughly what delegation schedulers are for? (Although [exec.get.delegation.scheduler] still feels a bit cryptic to me.) But at the end of the day, if I follow correctly, sync_wait is a deadlock footgun since the default scheduler is run_loop, so if the sender passed to sync_wait tries to schedule work on the pool that the calling thread belongs to, if that happens recursively, eventually the caller will be the only available thread, it'll schedule work, and then go back to its run_loop and deadlock. Is that correct?

BenFrantzDale commented 1 month ago

I keep thinking about this 'cause it feels like the missing link for writing code where I don't worry about sync_wait messing everything up. If I'm using a tbb thread pool, and if I implemented my own sync_wait based on tbb::task_group in place of stdexec::run_loop, I think I would get the desired behavior of being able to nest calls to sync_wait(schedule(sch) | ...) without worrying about deadlock. But that's not a solution for general users of sync_wait.