grantila / q

A platform-independent promise library for C++, implementing asynchronous continuations.
http://libq.io
Apache License 2.0
193 stars 24 forks source link

schedule task in a loop? #21

Closed deblauwetom closed 6 years ago

deblauwetom commented 6 years ago

What is the recommended way to handle stuff in a loop with libq?

I am trying to do a webclient that repeatedly sends some records to a database backend online.

So I have a queue + a blocking execution context inside an std::thread in my webclient class, and also another "dispatcher" class which also has a different queue + blocking execution context + std::thread . The dispatcher class will periodically read from the database and send 5 records using the webclient.

So I have something like this in the dispatcher:

    auto observationsToSync = observationsDatabase->getNextUnsynchronisedObservations(5);
    std::vector<q::promise<bool>> syncCalls;
    for(const auto& observation : observationsToSync)
    {
        syncCalls.emplace_back(client->sendJson("/api/observations", observation));
    }
    if(!syncCalls.empty()) {
        q::all(syncCalls, qExecContext->queue()).then([this](auto&& results){
           //....indicate that these records are synced
        });
    }

But now I want to do this code periodically, and so I could of course just do this in a separate thread, but it would be nice to know what the best "libq"-way is to accomplish this?

deblauwetom commented 6 years ago

So I came up with this solution. I first thought that it would result in some ever increasing stack, but it does not if I have it right from what I see in my debugger:

I call this function once at startup, then it keeps going, very nice:

q::promise<> BackendDispatcherImpl::sendSomeObservations()
{
    return q::with( qExecContext->queue() ).then([this](){
        std::cout << "Hello world:" << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(2s);
        std::cout << "Hello world after 2s:" << std::this_thread::get_id() << std::endl;
        return sendSomeObservations();
    });
}

So if you have an alternative approach, I would love to hear it.

Thanks, Best regards

grantila commented 6 years ago

This problem is fundamentally about

  1. Lifecycle management (your [this] closure)
  2. Backpressure handling

None of which is trivial.

But let's assume your BackendDispatchedImpl should live throughout the lifecycle of the application, (1) is fine.

Regarding looping, you need to somehow assure/test that input doesn't arrive faster then you can handle+output. As long as that's ok, your loop is ok. You will however have a thread that's blocking (sleep_for) which is non-pretty. If this is all your program is doing, leave it like that, it's fine.

But if you want a cleaner solution, consider letting an execution context do the sleeping instead, it can handle "multiple sleeps", i.e you can schedule multiple things to eventually happen on the same execution context. It is its own event loop after all. On a promise, you can: .delay( std::chrono::seconds( 2 ) )

Have a look at tests/q/src/promise/delay.cpp

You can also break the loop if you like. It would be something like this:

// This actually sends
q::promise<> BackendDispatcherImpl::actuallySendSomeObservations();

q::promise<> BackendDispatcherImpl::scheduleSendSomeObservations()
{
    return q::with( qExecContext->queue() )
    .delay(std::chrono::seconds((2))
    .then([this](){
        actuallySendSomeObservations();
        // recurse
        if (this->shouldStop)
            return q::with(qExecContext->queue());
        return scheduleSendSomeObservations();
    });
}
metalMajor commented 1 year ago

FYI, it seems this is not a good approach. After all these years, finally discovered that this construction causes infinite memory increments, because it's a huge chain of promises probably, and the library for some reason keeps a little bit of memory occupied per promise.