oneapi-src / oneTBB

oneAPI Threading Building Blocks (oneTBB)
https://oneapi-src.github.io/oneTBB/
Apache License 2.0
5.67k stars 1.02k forks source link

Guidance on assisting tbb cancellation? Feels like API is lacking #406

Open aggieNick02 opened 3 years ago

aggieNick02 commented 3 years ago

When an exception is thrown during TBB or cancellation is otherwise requested, there can be an issue with the tbb algorithm terminating if code being run by TBB is in a state where it will not yield. Often all it takes to remedy this is the execution of algorithm-specific code to assist with the cancellation. Is there some part of the API that allows for registration of a callback when cancellation is requested? If not, what is the right way to handle this? Consider the following example/scenario.

We have a TBB flow graph with a single input node and a subsequent function node. The input node just calls pop() on a concurrent_bounded_queue to get the next input.

When an exception is thrown by the function node, the input node is stuck in pop() waiting for another queue entry. The exception causes cancellation of the flow graph to be requested, but the flow graph must wait for the pop() to complete, and it won't.

What needs to happen is that, when cancellation is requested, any pending (or future) queue pop() operations need to stop and throw. The only way I can find in the API to check if cancellation is requested is to use the deprecated root_task() function, and I have to poll for it's context's is_group_execution_cancelled() value to find out when cancellation is requested. This feels really awkward. I would expect to be able to register a callback on any cancellation request to be able to assist in the cancellation process.

Full code follows. Hoping I am just missing something. Thanks.

#include <iostream>
#include "tbb/flow_graph.h"
#include "tbb/concurrent_queue.h"
#include <chrono>
#include <thread>
using namespace tbb::flow;
typedef tbb::concurrent_bounded_queue<int> queue_type;
struct input_body {
    input_body(queue_type& queue) : m_queue(queue) {}
    int operator()(tbb::flow_control& fc)
    {
        int i;
        m_queue.pop(i);
        return i;
    }
    queue_type& m_queue;
};
struct do_stuff_body {
    int operator()(int v) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10000));
        throw std::exception();
    }
};

int main() {
    queue_type queue;
    int result = 0;

    graph g;
    input_node<int> input(g, input_body(queue));
    function_node<int, int> do_stuff(g, serial, do_stuff_body());
    make_edge(input, do_stuff);
    std::thread t([&] {
        while (true)
        {
#pragma warning(disable: 4996)
            if (g.root_task()->context()->is_group_execution_cancelled())
            {
                queue.abort();
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
        });
    queue.push(1);
    input.activate();
    try
    {
        g.wait_for_all();
    }
    catch (const std::exception& e)
    {
        std::cout << e.what();
    }
    return 0;
}
fourierwu commented 3 years ago

i had face the same problem, and i make tbb::flow::graph to tbb::flow::graph, it works, just like below `tbb::flow::graph graph_tbb = new tbb::flow::graph();

tbb::flow::input_node src(*graph_tbb, [&](tbb::flow_control& fc) -> int { int i = 1; return i; });

tbb::flow::function_node<int, int> func1(*graph_tbb, 1, [](int i) -> int { std::cout << i << "\n"; return i; });

tbb::flow::make_edge(src, func1); src.activate(); ` the official doc is ridiculous

ivankochin commented 3 years ago

@aggieNick02

If I understand your issue correctly, then you may try to pass the custom tbb::task_group_context instance to the graph object during the construction and then check tbb::task_group_context::is_group_execution_cancelled() before calling abort(). Something like this:

// ...

int main() {
    queue_type queue;
    tbb::task_group_context context{};
    tbb::flow::graph g{context};
    // ...
    std::thread t([&] {
        while (true)
        {
            if (context.is_group_execution_cancelled())
            {
                queue.abort();
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
    });
}
aggieNick02 commented 3 years ago

Right, so something like that will work, but it feels really awkward and not in the spirit of TBB. I have to spawn a thread manually and poll constantly to figure out if the TBB flow graph needs help shutting down.

It would be a lot more natural, and not require a separate thread and polling, if TBB allowed nodes and/or a graph to specify a callback to assist with their shutdown when an exception is thrown. This is basically what I ended up implementing manually, by having every node body wrapped in a try/catch/rethrow, with the catch calling a function that invokes a callback implemented by any node that may need to shutdown locking resources in the event of an exception. It works well, but is pretty verbose since it's not baked into the API.

BenFrantzDale commented 2 years ago

Related is https://wg21.link/P2300 which has its own approach (plumbing exceptions and cancelation through the pipeline).

alexey-katranov commented 2 years ago

Let me simplify the example a bit:

tbb::concurrent_bounded_queue<int> queue;
tbb::task_group tg;
tg.run([&] {
    throw std::exception{};    
});
tg.run_and_wait([&] {
    queue.pop(); // it is not aborted when the exception is thrown
});

If I understand the initial question correctly, we want queue.pop() to be cancelled when the exception is thrown. By the current design, concurrent algorithms are completely independent from task scheduler and cannot observe the exception. I do not think that we need this connection in general case but what do you think if we can register some callback on exception? E.g.,

tbb::concurrent_bounded_queue<int> queue;
tbb::task_group_context ctx;
ctx.set_on_exception_callback([&] {
    queue.abort(); // about `queue.pop()` in the case of exception
});
tbb::task_group tg{ctx};
tg.run([&] {
    throw std::exception{};    
});
tg.run_and_wait([&] {
    queue.pop(); 
});

P.S. the example is not fully oneTBB complaint because it is not guaranteed that at least one worker thread exist to proceed a task spawned with tg.run(...).