Closed fabian-jung closed 6 months ago
Thank you for your work. I added some review comments. It would be nice if we could discuss about them.
I don't understand, where you would want to go with such a policy and how to sensefully isolate such a policy. Can you make a little draft, how you want it to be, than i can make the changes.
I provided a threadpool implementation for you, see here. Have a look at the test cases for the usage. Nevertheless, you can use this threadpool to implement your future returning async receive method without spawning a thread per method call:
template <typename T_CommunicationPolicy, typename T_GraphPolicy, typename SerializationPolicy>
template <typename T>
auto Cage<T_CommunicationPolicy, T_GraphPolicy, SerializationPolicy>::asyncRecv(
const Edge& edge, T& data) -> std::future<void>
{
VAddr srcVAddr = locateVertex(edge.source);
std::promise<void> dataReceived;
threadPool->post([&]() {
auto result = communicator->asyncProbe(srcVAddr, edge.id, graphContext);
if (result) {
decltype(auto) skeleton = SerializationPolicy::prepare(data);
communicator->recv(srcVAddr, edge.id, graphContext, skeleton);
SerializationPolicy::restore(data, skeleton);
dataReceived.set_value();
} else {
threadPool->post(asyncRecv(edge, data));
}
});
return dataReceived.get_future();
}
First of all a lambda is postet into the threadpool and executed there. Therefore, the function return immediately with the future. Then, the lambda polls recursively on the communicator until a message arrives and receives this message into the data buffer. Finally, the promise is set and therefore the user is notified that its data has received.
What I meant by policy is the following: I provided only one implementation for the concurrent handling of the receive function (called threadpool). Your solution, which always spawns a new thread, is also a valid one. So we could agree on an interface and call that ConcurrentPolicy etc.
See a full example here. If you like, then I could finalize it or we meet for a fast hack session.
I think, that the receive function will most likely not work. The std::promise is holding the shared state and will destroy it if it is destructed. The future will generate a std::future::error on the get() or wait(). This behaviour is documented here: http://en.cppreference.com/w/cpp/thread/promise
One solution is, to tie the std::promise with the task or the thread, that is executing the task.
The lambda looks good to me. The recursive call does enable the threadpool to clean up the tasks without completing them all. An alternative would be a loop around a bool, that is set if the cage enters the destructor.
I worte a threadpool, that creates typed std::futures to solve the first problem. The promise are tied with the tasks.
//PooledAsync.hpp
#pragma once
#include <queue>
#include <vector>
#include <thread>
#include <algorithm>
#include <memory>
#include <future>
#include <type_traits>
namespace detail {
// This is the interface for the task list
class CallableInterface {
public:
// The tasks have to have no in and output,
// they are only executed by the threadpool
// The virtual function dispatch is neccessary
// because the action and execution order is runtime
// dependent. I could have done this with function
// pointers, but that come with the same overhead,
// but adds more code obfuscation and possible
// memory leaks.
virtual void operator()() = 0;
virtual ~CallableInterface() {};
};
template <class Functor>
class Callable : public CallableInterface {
// Copy constructor is deleted for safety. The Functor
// may come with internal state, that is not trivially
// copyable.
Callable(const Callable& copy) = delete;
public:
// Result of the functor
using Result = typename std::result_of<Functor()>::type;
// The task wrapper. This is what hold the std::promise
// and therefore shared state. It will not be deleted until
// completion of the task, after which the state is moved
// to the future object in the client code.
std::packaged_task<Result()> task;
Callable(Functor functor) :
task(functor)
{};
~Callable() override {
}
void operator()() override {
// Simple task execution. The std::packaged_task will set
// the value of the future and make it ready. After this
// point in time, the packaged_task can be deleted safely.
task();
};
};
// This is my implementation of a queue with atomic operations.
// The atomicity is achieved by coarse grained locking, but that
// should not harm the performance, since there are only small pointer
// pushed and poped to the queue and the empty() should not execute long
// anyway.
template <class Type>
class AtomicQueue {
std::mutex mutex;
std::queue<Type> queue;
// To reduce cpu load, if no tasks are in the queue, I modified
// the pop to a wait_pop. Thats why i need the condition variable.
std::condition_variable cv;
public:
AtomicQueue() {};
// The push method makes use of move sematics, if possible or needed.
void push(Type&& value) {
std::unique_lock<std::mutex> lock(mutex);
queue.push(std::move(value));
cv.notify_one();
}
// The wait_pop will try to pop an element from the queue. If
// there are no elements in the queue, the function will wait until
// a timeout is reached. If there are no elements added in this time,
// the function will return and indicate the failure by the return
// value "false"
bool wait_pop(Type& value) {
std::unique_lock<std::mutex> lock(mutex);
if(queue.empty()) cv.wait_for(lock, std::chrono::milliseconds(200));
if(!queue.empty()) {
value = std::move(queue.front());
queue.pop();
return true;
}
return false;
}
bool empty() {
std::unique_lock<std::mutex> lock(mutex);
return queue.empty();
}
};
}
class ThreadPool {
// This variable is set to false, if the ThreadPool is tried to be deconstructed
volatile bool running;
// The thread pool
std::vector<std::thread> threads;
// Queue of tasks to do
detail::AtomicQueue<std::unique_ptr<detail::CallableInterface>> tasks;
// the executed function of the spawned threads
void run() {
while(running or !tasks.empty()) {
std::unique_ptr<detail::CallableInterface> task;
// Try to get a task to do. If not successful continue spinning,
// else execute the task
if(tasks.wait_pop(task)) {
task->operator()();
}
}
}
public:
ThreadPool(unsigned int threadCount) :
running(true)
{
for(unsigned int i = 0; i < threadCount; i++) threads.emplace_back(&ThreadPool::run, this);
}
// The destructor will make all threads joinable, by joining them. This
// may or may not be wanted. If one thread is stuck, because a task can not
// be completed. The destructor will not finish.
// I found this behaviour to be the most consistent with the RAII concept.
~ThreadPool() {
running = false;
for(auto& thread : threads) {
thread.join();
};
}
// This is the method to add a new task to the queue.
template <class Functor>
std::future<typename std::result_of<Functor()>::type> async(Functor function) {
using Callable = detail::Callable<Functor>;
using ResultType = typename std::result_of<Functor()>::type;
// Construct a new Callable object from the functor
Callable* callable = new Callable(function);
// Get the future from the Callable
std::future<ResultType> future = callable->task.get_future();
// Push the Callable to the task queue. This is the reason, we need the
// virtual dispatch. We can not know at compiletime, what action might be
// pushed and what return value they have.
// The big advantage of the virtual class interface over std::function is,
// that we support multiple return types. With std::function the signature
// would be needed at this point.
tasks.push(std::unique_ptr<detail::CallableInterface>(callable));
// move the future out
return std::move(future);
}
};
An usage example is here:
#include "PooledAsync.hpp"
#include <iostream>
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> results;
for(int i = 0; i < 16; i++) {
results.push_back(pool.async([i]() -> int {
std::cout << "Task " << i << " executed on thread " << std::this_thread::get_id() << std::endl;
if(i == 0) std::this_thread::sleep_for(std::chrono::milliseconds(3000));
std::cout << "Task " << i << " finished." << std::endl;
return i;
}));
}
std::cout << "Results:" << std::endl;
for(auto& f : results) {
std::cout << f.get() << std::endl;
}
return 0;
};
I tested the code with valgrind to check for leaks (virtual destructors scared me a little). It looks alright.
I think, that the receive function will most likely not work. The std::promise is holding the shared state and will destroy it if it is destructed. The future will generate a std::future::error on the get() or wait().
Sorry, my fault i forgot to put the promise into a shared pointer and copy it into the lambda:
template <typename T_CommunicationPolicy, typename T_GraphPolicy, typename SerializationPolicy>
template <typename T>
auto Cage<T_CommunicationPolicy, T_GraphPolicy, SerializationPolicy>::asyncRecv(
const Edge& edge, T& data) -> std::future<void>
{
VAddr srcVAddr = locateVertex(edge.source);
std::shared_ptr<std::promise<void>> dataReceived;
threadPool->post([dataReceived, srcVAddr, edge, graphContext]() {
auto result = communicator->asyncProbe(srcVAddr, edge.id, graphContext);
if (result) {
decltype(auto) skeleton = SerializationPolicy::prepare(data);
communicator->recv(srcVAddr, edge.id, graphContext, skeleton);
SerializationPolicy::restore(data, skeleton);
dataReceived->set_value();
} else {
threadPool->post(asyncRecv(edge, data));
}
});
return dataReceived->get_future();
}
Your threadpool implementation looks valid to me, but "erfindet das rad neu". I think the io service of boost asio does already a good job. Nevertheless, lets talk about an interface of these threadpools so we can provide several implementations. I would suggest the following:
class ThreadPoolBase {
template <typename T_Task>
void post(const T_Task& task) = delete;
};
I think its not necessary to return a future. You can also capture the promise and set its state when the work is done. What do you think about the interface?
I ran a benchmark where i compared the original event based async receive vs. the future based async receive with threadpool. There is no measurable performance drop:
Benchmark Time CPU Iterations
-----------------------------------------------------------------------------
meassureAsyncReceiveEventZmq/1 675452 ns 299076 ns 2210
meassureAsyncReceiveEventZmq/10 488639 ns 279084 ns 2019
meassureAsyncReceiveEventZmq/100 499917 ns 284202 ns 2770
meassureAsyncReceiveEventZmq/1000 469693 ns 282007 ns 2515
meassureAsyncReceiveEventZmq/9.76562k 629828 ns 442471 ns 1532
meassureAsyncReceiveEventZmq/97.6562k 2301000 ns 2074431 ns 328
meassureAsyncReceiveEventZmq/976.562k 18394031 ns 18076931 ns 39
meassureAsyncReceiveFutureZmq/1 440308 ns 259933 ns 2657
meassureAsyncReceiveFutureZmq/10 433200 ns 256121 ns 2603
meassureAsyncReceiveFutureZmq/100 437497 ns 259767 ns 2630
meassureAsyncReceiveFutureZmq/1000 519450 ns 286996 ns 2540
meassureAsyncReceiveFutureZmq/9.76562k 612804 ns 435234 ns 1537
meassureAsyncReceiveFutureZmq/97.6562k 2235897 ns 2020391 ns 339
meassureAsyncReceiveFutureZmq/976.562k 18263627 ns 17938688 ns 39
Nice work!
The fix with the std::shared_ptr looks nice and easy. The interface to the threadpool looks also nice.
One thing i would improve is the "std::shared_ptr<std::promise
I merged my pull request with an example implementation. I leave it up to you to improve the example. I would like to merge you first commit. So could you remove the last commit from this pr?
I removed the busy waiting from the zmq event system. All the tests (checkFast and check) pass.
Additionally i added a std::future api call for cage::recv. This is not tested now and i would consider it as work in progress.
Please leave a comment what you think about these two changes.