grantila / q

A platform-independent promise library for C++, implementing asynchronous continuations.
http://libq.io
Apache License 2.0
192 stars 24 forks source link

equivalent to async #26

Open maoravni opened 6 years ago

maoravni commented 6 years ago

What is the LibQ equivalent to std::async? More specifically:

std::future f = std::async([]{ // ... return 42; })); int i = f.get();

when the function blocks until f.get() returns with a value?

grantila commented 6 years ago

This is two questions.

The first is the equivalent to std::async which runs a function in a new thread (or potentially re-used thread, or potentially later after other tasks has finished - basically you have no control over it).

In q, you can either schedule a task on an existing threadpool (by associating a scheduler+queue to it), or run a function in a newly created thread using q::run which requires a name (the thread will be named): auto prom = q::run("my-thread", [](int i){ return 42; });.

The second question is how to block-read a promise value. There is no .get() in q, as I consider blocking reads to generally be an anti-pattern which you should try to avoid. You can however create a blocking dispatcher to provide the same functionality:

auto ec = q::make_execution_context< q::blocking_dispatcher >( "blocking-read" );

int n;
prom
    // copy return value from prom to outer scope
    .then( [ &n ]( int val ){ n = val; } )
    // stop the blocking dispatcher
    .then(
        [ ec ]( ){ ec->dispatcher( )->terminate( q::termination::linger ); },
        ec->queue( )
    );

ec->dispatcher( )->start( ); // This will block until it's terminated (above)

n; // is now 42

Do note that if the thread function throws an exception, the returned promise (prom) will contain this exception, and unless you .fail() or .finally(), the above will be unsafe. Check the "hello world" example here.

maoravni commented 6 years ago

My use-case for using the library was for implementing a multi-producer single-consumer system. Having a block-read is one of the requirements. Re-use is also a primary requirement, as this is a performance-critical system. I wouldn't want to fire up a thread and then terminate it for a single task.

Is there another way of enqueuing tasks/continuations into a thread, blocking, and then extracting the return data from the promise?

grantila commented 6 years ago

Why would blocking read be a requirement? Sequential handling of completions I can understand, but nothing ever really needs to block.

If it's performance critical and you don't want to fire up threads for a single task, std::async is undefined what it does - it might do just this. You probably want:

You put tasks on a queue which is bound to a thread pool - and allow them to execute as soon as possible. You read the results of these concurrently-executing tasks on a single "main" thread (so the results are read and handled sequentially).

// Make blocking "main" execution context
auto ec_main = q::make_execution_context< q::blocking_dispatcher >( "main" );
// Make an execution context for a threadpool with 4 threads
// This context's _completion_ is scheduled on the blocking dispatcher's queue...
auto ec_pool = q::make_execution_context< q::threadpool, q::direct_scheduler >( "main background pool", ec_main->queue( ), 4 );

// This is a function which should run in the threadpool:
int some_function( ); // defined somewhere else

// Add task "fn" to the threadpool:
auto promise = q::make_promise( ec_pool->queue( ), some_function );

promise.then(
    [ ]( int value ){ /* do something */ },
    ec_main->queue( ) // Will run on the blocking thread
);

ec->dispatcher( )->start( ); // Will block

However, if you're going to continuously read results from background tasks and handle the results, consider using a q::channel. It has a backlog which you can likely ignore unless you want to handle upstream pressure.

// Create a channel with the blocking dispatchers' queue as default.
// Backlog of 5, can be ignored unless you need to handle upstream pressure
q::channel< int > ch( ec_main->queue( ), 5 );

auto readable = ch.get_readable( );
auto writable = ch.get_writable( );

const completion = readable->consume(
    // Will be called for each completed job and run sequentially because
    // this function is called on the blocking dispatcher's "main" thread
    [ ]( int value ){ /* handle value */ }
);
// When all tasks are done, close the writable:
writable.close( );
// The result of consume() is a promise which will be resolved when the writable is closed
completion.then( [ ]( ){ std::cout << "Done" << std::endl; } );

Each background task can then capture writable and writable.write( 42 ); when they are finished. These integer values (in this example) will be written thread-safely to the channel, and read sequentially by the consumer.