gul-cpp / gul14

General Utility Library for C++14
https://gul14.info/
GNU Lesser General Public License v2.1
2 stars 1 forks source link

Thread pool #62

Closed alt-graph closed 7 months ago

alt-graph commented 8 months ago

This is an implementation of a ThreadPool class. We have some similar code in use in our HLC libs across multiple servers.

Some notes:

Closes #28

Finii commented 8 months ago

I'm not sure why we need to self-roll this. Why not simply

https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/reference/thread_pool.html

image

We use (of course) boost::asio for the serial communication for example. The boost thread_pool is also based on asio. No need to develop and debug if there is a battle tested alternative so readily available. Boost also has fibers, if that suits better than threads.

alt-graph commented 8 months ago

I'm not sure why we need to self-roll this. Why not simply

https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/reference/thread_pool.html

You may be right, but so far we avoid boost dependencies in the DOOCS core libs, and I am not too enthusiastic about opening that can of worms – again, because we had that discussion already some years ago.

This ThreadPool also has some features that are not in boost, like scheduling tasks for a delayed start. Also the task names are something that I have not seen in any other implementation, but they are invaluable for debugging if you have a task with a deadlock. The implementation here went from being in one server to being copied around to a handful to being a part of HLC Util to the current rework for GUL. What I am trying to say is: It has already seen quite a bit of usage and would immediately be useful as-is.

soerengrunewald commented 8 months ago

You may be right, but so far we avoid boost dependencies in the DOOCS core libs, and I am not too enthusiastic about opening that can of worms – again, because we had that discussion already some years ago.

I would also avoid adding additional dependencies to the core libraries. Non the less there are a lot of thread pool libraries out there.

This ThreadPool also has some features that are not in boost, like scheduling tasks for a delayed start. Also the task names are something that I have not seen in any other implementation, but they are invaluable for debugging if you have a task with a deadlock. The implementation here went from being in one server to being copied around to a handful to being a part of HLC Util to the current rework for GUL. What I am trying to say is: It has already seen quite a bit of usage and would immediately be useful as-is.

But this raises the question, do you plan to use the ThreadPool in the DOOCS core libraries and outside of the core libraries, so that we need them in gul?

I'm not per se against adding it, but I also don't wanna see gul growing in a full-fletched framework.

soerengrunewald commented 8 months ago

I would suggest to inline, meaning move the implement from the cpp into the header, for all the the one-two-line (like count_threads, count_pending etc.) methods. This would allow for better optimization without the need of LTO been enabled.

alt-graph commented 8 months ago

But this raises the question, do you plan to use the ThreadPool in the DOOCS core libraries and outside of the core libraries, so that we need them in gul?

We already use it outside, and I am playing with the idea of adding a small thread pool to the DOOCS-over-ZeroMQ asynchronous receiver. Since our main model for subscriptions is user callbacks, it might make sense to distribute the callbacks over a thread pool for high load scenarios. ZeroMQ is blazingly fast in receiving messages, but user callbacks may not always be able to keep up. I am not sure if we will really need this, but the new archiver might be a use case.

I'm not per se against adding it, but I also don't wanna see gul growing in a full-fletched framework.

I agree, but for me a thread pool/task queue has a similar level of utility as the SlidingBuffer. I frankly don't want to live without either anymore.

alt-graph commented 8 months ago

I believe this is ready for review now. The documentation looks OK, and the tests seem to pass.

alt-graph commented 8 months ago

Hold on, I guess I'll move the std::invoke-related things into their own PR and squash some of the other commits. Otherwise this is just too much...

alt-graph commented 8 months ago

OK, now this looks more reviewable than before. I recommend reviewing the entire diff rather than individual commits, though. If anyone prefers I can move the Meson changes (for building unit tests with static libs) into a separate PR.

alt-graph commented 7 months ago

Force-pushed to resolve a merge conflict. This is still in need of a review; please let me know if you prefer a squashed version.

Finii commented 7 months ago

I will review today.

Finii commented 7 months ago

Suggested Patch 2, maybe include something similar

(This does not fix the examples' code)

From bdbc964d0664229f97bf32cd1aa5b62d85d21a90 Mon Sep 17 00:00:00 2001
From: Fini Jastrow <ulf.fini.jastrow@desy.de>
Date: Tue, 27 Feb 2024 02:58:10 +0100
Subject: [PATCH 2/2] CI: Try to compile examples

Signed-off-by: Fini Jastrow <ulf.fini.jastrow@desy.de>
---
 .github/workflows/testposix.yml | 3 +++
 examples/meson.build            | 9 +++++++++
 meson.build                     | 1 +
 3 files changed, 13 insertions(+)
 create mode 100644 examples/meson.build

diff --git a/.github/workflows/testposix.yml b/.github/workflows/testposix.yml
index 085a57f..308ed10 100644
--- a/.github/workflows/testposix.yml
+++ b/.github/workflows/testposix.yml
@@ -64,6 +64,9 @@ jobs:
       - name: Build and run ASAN tests
         run: ./meson.py test -C build.asan

+      - name: Build exampes
+        run: ninja -C build.asan examples/thread_pool_example examples/trim_example
+
       - name: Build and run release tests
         run: ./meson.py test -C build.release

diff --git a/examples/meson.build b/examples/meson.build
new file mode 100644
index 0000000..b4f4f9c
--- /dev/null
+++ b/examples/meson.build
@@ -0,0 +1,9 @@
+executable('trim_example', 'trim.cc',
+    dependencies : [ libgul_static_dep ],
+    build_by_default : false,
+)
+
+executable('thread_pool_example', 'thread_pool.cc',
+    dependencies : [ libgul_static_dep ],
+    build_by_default : false,
+)
diff --git a/meson.build b/meson.build
index 4d923e5..f299659 100644
--- a/meson.build
+++ b/meson.build
@@ -25,6 +25,7 @@ subdir('tests')

 message('Install prefix: ' + get_option('prefix'))
 subdir('debian')
+subdir('examples')

 message('LIBGUL_API_VERSION       @0@'.format(libgul_api_version))
 message('GIT FULL API VERSION    @0@'.format(git_full_api_version))
-- 
2.40.1
Finii commented 7 months ago

Patch 1: How to make one object out of ThreadPool and ThreadPoolEngine

(Rough, just to show the idea)

The engine_ member must come last of all data member, maybe that needs a comment in the code. It does in the diff but is easily forgotten with time.

Edit: Add two 'paragraphs' above Edit: Hide diff in details

```diff From 3dd323bec244dccb22f09187727da00a74b6f92f Mon Sep 17 00:00:00 2001 From: Fini Jastrow Date: Tue, 27 Feb 2024 02:53:33 +0100 Subject: [PATCH 1/2] Join ThreadPool and ThreadPoolEngine Signed-off-by: Fini Jastrow --- include/gul14/ThreadPool.h | 163 ++++++------------------------------- src/ThreadPool.cc | 37 +++++---- tests/test_ThreadPool.cc | 18 ++-- 3 files changed, 53 insertions(+), 165 deletions(-) diff --git a/include/gul14/ThreadPool.h b/include/gul14/ThreadPool.h index 9498cae..c485afc 100644 --- a/include/gul14/ThreadPool.h +++ b/include/gul14/ThreadPool.h @@ -40,14 +40,14 @@ namespace gul14 { -class ThreadPoolEngine; +class ThreadPool; using TaskId = std::uint64_t; namespace detail { -bool cancel_task(std::weak_ptr pool, TaskId task_id); -bool is_running(std::weak_ptr pool, TaskId task_id); -bool is_pending(std::weak_ptr pool, TaskId task_id); +bool cancel_task(std::weak_ptr pool, TaskId task_id); +bool is_running(std::weak_ptr pool, TaskId task_id); +bool is_pending(std::weak_ptr pool, TaskId task_id); } // namespace detail /** @@ -136,13 +136,13 @@ public: bool is_running() const { return detail::is_running(pool_, id_); } private: - friend class ThreadPoolEngine; + friend class ThreadPool; std::future future_; TaskId id_; - std::weak_ptr pool_; + std::weak_ptr pool_; - TaskHandle(TaskId id, std::future future, std::shared_ptr pool) + TaskHandle(TaskId id, std::future future, std::shared_ptr pool) : future_{ std::move(future) } , id_{ id } , pool_{ pool } @@ -178,7 +178,7 @@ private: * * \since GUL version 2.11 */ -class ThreadPoolEngine : public std::enable_shared_from_this +class ThreadPool { public: using TimePoint = std::chrono::time_point; @@ -190,7 +190,7 @@ public: /// Maximum possible number of threads constexpr static std::size_t max_threads{ 10'000 }; - ThreadPoolEngine(std::size_t num_threads, std::size_t capacity); + ThreadPool(std::size_t num_threads, std::size_t capacity = 200); /** * Destruct the ThreadPoolEngine and join all threads. @@ -199,7 +199,7 @@ public: * been started in one of the threads gets canceled, but work that has already been * assigned to a thread continues to be executed until it completes. */ - ~ThreadPoolEngine(); + ~ThreadPool(); /** * Enqueue a task. @@ -217,10 +217,10 @@ public: * \exception std::runtime_error is thrown if the queue is full. */ template - TaskHandle> + TaskHandle> add_task(Function fct, TimePoint start_time = {}, std::string name = {}) { - using Result = invoke_result_t; + using Result = invoke_result_t; TaskHandle task_handle = [this, &fct, start_time, &name]() { @@ -233,13 +233,13 @@ public: pending_tasks_.size(), ')')); } - using PackagedTask = std::packaged_task; + using PackagedTask = std::packaged_task; auto named_task_ptr = std::make_unique>( PackagedTask{ std::move(fct) }, std::move(name)); - TaskHandle handle{ - next_task_id_, named_task_ptr->fct_.get_future(), shared_from_this() }; + TaskHandle handle( + next_task_id_, named_task_ptr->fct_.get_future(), engine_ ); pending_tasks_.emplace_back( next_task_id_, std::move(named_task_ptr), start_time); @@ -260,13 +260,13 @@ public: add_task(Function fct, TimePoint start_time = {}, std::string name = {}) { return add_task( - [f = std::move(fct)](ThreadPoolEngine&) { return f(); }, + [f = std::move(fct)](ThreadPool&) { return f(); }, start_time, std::move(name)); } template ::value, bool> = true> - TaskHandle> + std::enable_if_t::value, bool> = true> + TaskHandle> add_task(Function fct, Duration delay_before_start, std::string name = {}) { return add_task(std::move(fct), @@ -283,8 +283,8 @@ public: } template ::value, bool> = true> - TaskHandle> + std::enable_if_t::value, bool> = true> + TaskHandle> add_task(Function fct, std::string name) { return add_task(std::move(fct), TimePoint{}, std::move(name)); @@ -363,7 +363,7 @@ private: {} virtual ~NamedTask() = default; - virtual void operator()(ThreadPoolEngine& pool) = 0; + virtual void operator()(ThreadPool& pool) = 0; std::string name_; }; @@ -377,7 +377,7 @@ private: , fct_{ std::move(fct) } {} - void operator()(ThreadPoolEngine& pool) override { fct_(pool); } + void operator()(ThreadPool& pool) override { fct_(pool); } FunctionType fct_; }; @@ -418,6 +418,9 @@ private: TaskId next_task_id_ = 0; bool shutdown_requested_{ false }; + std::shared_ptr engine_; + + /// Non-locking internal versions of the public functions bool is_full_i() const noexcept; @@ -454,122 +457,6 @@ private: * * \since GUL version 2.11 */ -class ThreadPool -{ -public: - using TimePoint = ThreadPoolEngine::TimePoint; - using Duration = ThreadPoolEngine::Duration; - - /** - * Create a thread pool with the desired number of threads and the specified capacity - * for queuing tasks. - * - * This constructor launches the desired number of threads. The threads are joined - * when the ThreadPool object gets destroyed. - * - * \param num_threads Desired number of threads - * \param capacity Maximum number of pending tasks that can be queued - * - * \exception std::invalid_argument is thrown if the desired number of threads is - * zero or greater than max_threads, or if the requested capacity is zero - * or exceeds max_capacity. - */ - ThreadPool(std::size_t num_threads, std::size_t capacity = 200) - : engine_{ std::make_shared(num_threads, capacity) } - {} - - /** - * Enqueue a task. - * - * There are multiple overloads of this function for variations of the arguments: - * - * \param fct The first argument must be a function object or function pointer to be - * executed. This function can have an arbitrary return type and may - * either take no arguments (`T fct()`) or a reference to the - * ThreadPoolEngine by which it gets executed - * (`T fct(ThreadPoolEngine&)`). - * \param args Start time and/or task name: As an optional second parameter, either a - * time point or a duration can be given. A time point - * specifies the earliest time at which the task may be started, a - * duration specifies a time point relative to the current time. - * As an optional last parameter, a task name can be specified. - * This is mainly useful for debugging. - * - * \returns a TaskHandle for this task. - * \exception std::runtime_error is thrown if the queue is full. - * - * \code{.cpp} - * ThreadPool pool(2); // Create a pool with 2 threads - * - * // A simple task that does not interact with the pool - * pool.add_task([]() { std::cout << "Task 1\n"; }); - * - * // A task that schedules another task to start two seconds later - * pool.add_task( - * [](ThreadPoolEngine& pool) - * { - * std::cout << "Task 2\n"; - * pool.add_task([]() { std::cout << "Task 3\n"; }, 2s); - * }); - * - * // A task with a name - * pool.add_task([]() { std::cout << "Task 4\n"; }, "Task 4"); - * \endcode - */ - template - auto add_task(Function fct, Args&&... args) - { - static_assert( - is_invocable::value - || is_invocable::value, - "Invalid function signature: Must be T fct() or T fct(ThreadPoolEngine&)"); - - return engine_->add_task(std::forward(fct), std::forward(args)...); - } - - /** - * Remove all pending tasks from the queue. - * - * This call removes all tasks that have not yet been started from the queue. It has - * no impact on tasks that are currently being executed. - * - * \returns the number of tasks that were removed. - */ - std::size_t cancel_pending_tasks() { return engine_->cancel_pending_tasks(); } - - /// Return the maximum number of pending tasks that can be queued. - std::size_t capacity() const noexcept { return engine_->capacity(); } - - /// Return the number of pending tasks. - std::size_t count_pending() const { return engine_->count_pending(); } - - /// Return the number of threads in the pool. - std::size_t count_threads() const noexcept { return engine_->count_threads(); } - - /// Return a vector with the names of the tasks that are waiting to be executed. - std::vector get_pending_task_names() const - { - return engine_->get_pending_task_names(); - } - - /// Return a vector with the names of the tasks that are currently running. - std::vector get_running_task_names() const - { - return engine_->get_running_task_names(); - } - - /// Determine whether the queue for pending tasks is full (at capacity). - bool is_full() const noexcept { return engine_->is_full(); } - - /** - * Return true if the pool has neither pending tasks nor tasks that are currently - * being executed. - */ - bool is_idle() const { return engine_->is_idle(); } - -private: - std::shared_ptr engine_; -}; /// @} diff --git a/src/ThreadPool.cc b/src/ThreadPool.cc index eb1b098..b455dd8 100644 --- a/src/ThreadPool.cc +++ b/src/ThreadPool.cc @@ -32,7 +32,7 @@ namespace gul14 { namespace detail { -bool cancel_task(std::weak_ptr pool, TaskId id) +bool cancel_task(std::weak_ptr pool, TaskId id) { auto shared_ptr = pool.lock(); if (!shared_ptr) @@ -41,7 +41,7 @@ bool cancel_task(std::weak_ptr pool, TaskId id) return shared_ptr->cancel_pending_task(id); } -bool is_pending(std::weak_ptr pool, TaskId id) +bool is_pending(std::weak_ptr pool, TaskId id) { auto shared_ptr = pool.lock(); if (!shared_ptr) @@ -50,7 +50,7 @@ bool is_pending(std::weak_ptr pool, TaskId id) return shared_ptr->is_pending(id); } -bool is_running(std::weak_ptr pool, TaskId id) +bool is_running(std::weak_ptr pool, TaskId id) { auto shared_ptr = pool.lock(); if (!shared_ptr) @@ -62,9 +62,10 @@ bool is_running(std::weak_ptr pool, TaskId id) } // namespace detail -ThreadPoolEngine::ThreadPoolEngine(std::size_t num_threads, std::size_t capacity) +ThreadPool::ThreadPool(std::size_t num_threads, std::size_t capacity) : capacity_(capacity) { + engine_.reset(this, [](ThreadPool*){}); if (num_threads == 0 || num_threads > max_threads) { throw std::invalid_argument( @@ -79,7 +80,7 @@ ThreadPoolEngine::ThreadPoolEngine(std::size_t num_threads, std::size_t capacity threads_.emplace_back([this]() { perform_work(); }); } -ThreadPoolEngine::~ThreadPoolEngine() +ThreadPool::~ThreadPool() { std::unique_lock lock(mutex_); shutdown_requested_ = true; @@ -93,7 +94,7 @@ ThreadPoolEngine::~ThreadPoolEngine() } } -bool ThreadPoolEngine::cancel_pending_task(const TaskId task_id) +bool ThreadPool::cancel_pending_task(const TaskId task_id) { std::lock_guard lock(mutex_); @@ -108,7 +109,7 @@ bool ThreadPoolEngine::cancel_pending_task(const TaskId task_id) return false; } -std::size_t ThreadPoolEngine::cancel_pending_tasks() +std::size_t ThreadPool::cancel_pending_tasks() { std::lock_guard lock(mutex_); @@ -118,18 +119,18 @@ std::size_t ThreadPoolEngine::cancel_pending_tasks() return num_removed; } -std::size_t ThreadPoolEngine::count_pending() const +std::size_t ThreadPool::count_pending() const { std::lock_guard lock(mutex_); return pending_tasks_.size(); } -std::size_t ThreadPoolEngine::count_threads() const noexcept +std::size_t ThreadPool::count_threads() const noexcept { return threads_.size(); } -std::vector ThreadPoolEngine::get_pending_task_names() const +std::vector ThreadPool::get_pending_task_names() const { std::lock_guard lock(mutex_); @@ -142,30 +143,30 @@ std::vector ThreadPoolEngine::get_pending_task_names() const return names; } -std::vector ThreadPoolEngine::get_running_task_names() const +std::vector ThreadPool::get_running_task_names() const { std::lock_guard lock(mutex_); return running_task_names_; } -bool ThreadPoolEngine::is_full() const noexcept +bool ThreadPool::is_full() const noexcept { std::lock_guard lock(mutex_); return is_full_i(); } -bool ThreadPoolEngine::is_full_i() const noexcept +bool ThreadPool::is_full_i() const noexcept { return pending_tasks_.size() >= capacity_; } -bool ThreadPoolEngine::is_idle() const +bool ThreadPool::is_idle() const { std::lock_guard lock(mutex_); return pending_tasks_.empty() && running_task_ids_.empty(); } -bool ThreadPoolEngine::is_pending(const TaskId task_id) const +bool ThreadPool::is_pending(const TaskId task_id) const { std::lock_guard lock(mutex_); @@ -175,20 +176,20 @@ bool ThreadPoolEngine::is_pending(const TaskId task_id) const return it != pending_tasks_.end(); } -bool ThreadPoolEngine::is_running(const TaskId task_id) const +bool ThreadPool::is_running(const TaskId task_id) const { std::lock_guard lock(mutex_); auto it = std::find(running_task_ids_.begin(), running_task_ids_.end(), task_id); return it != running_task_ids_.end(); } -bool ThreadPoolEngine::is_shutdown_requested() const +bool ThreadPool::is_shutdown_requested() const { std::lock_guard lock(mutex_); return shutdown_requested_; } -void ThreadPoolEngine::perform_work() +void ThreadPool::perform_work() { #if defined(__APPLE__) || defined(__GNUC__) // On unixoid systems, we block a number of signals in the worker threads because we diff --git a/tests/test_ThreadPool.cc b/tests/test_ThreadPool.cc index f7c8ede..71362b4 100644 --- a/tests/test_ThreadPool.cc +++ b/tests/test_ThreadPool.cc @@ -261,9 +261,9 @@ TEST_CASE("ThreadPool: add_task(f(ThreadPool&, ...))", "[ThreadPool]") std::atomic done{ false }; pool->add_task( - [&start](ThreadPoolEngine&) { while (!start) gul14::sleep(10us); }); + [&start](ThreadPool&) { while (!start) gul14::sleep(10us); }); auto handle = pool->add_task( - [&done](ThreadPoolEngine&) { done = true; }, "Task 2"); + [&done](ThreadPool&) { done = true; }, "Task 2"); REQUIRE(pool->count_pending() >= 1); REQUIRE(pool->count_pending() <= 2); @@ -281,12 +281,12 @@ TEST_CASE("ThreadPool: add_task(f(ThreadPool&, ...))", "[ThreadPool]") const auto now = std::chrono::system_clock::now(); auto task1 = pool->add_task( - [&last_job](ThreadPoolEngine&) { last_job = 1; }, now + 120s); + [&last_job](ThreadPool&) { last_job = 1; }, now + 120s); pool->add_task( - [&last_job](ThreadPoolEngine&) { last_job = 2; }, now + 2ms, + [&last_job](ThreadPool&) { last_job = 2; }, now + 2ms, "task 2 (usually runs second)"); pool->add_task( - [&last_job](ThreadPoolEngine&) { last_job = 3; }, now, + [&last_job](ThreadPool&) { last_job = 3; }, now, "task 3 (usually runs first)"); while (last_job == 0) @@ -304,12 +304,12 @@ TEST_CASE("ThreadPool: add_task(f(ThreadPool&, ...))", "[ThreadPool]") // With start time as duration last_job = 0; task1 = pool->add_task( - [&last_job](ThreadPoolEngine&) { last_job = 1; }, 120s); + [&last_job](ThreadPool&) { last_job = 1; }, 120s); pool->add_task( - [&last_job](ThreadPoolEngine&) { last_job = 2; }, 2ms, + [&last_job](ThreadPool&) { last_job = 2; }, 2ms, "task 2 (usually runs second)"); pool->add_task( - [&last_job](ThreadPoolEngine&) { last_job = 3; }, 0ms, + [&last_job](ThreadPool&) { last_job = 3; }, 0ms, "task 3 (usually runs first)"); while (last_job == 0) @@ -608,7 +608,7 @@ TEST_CASE("ThreadPool: Tasks scheduling their own continuation", "[ThreadPool]") std::string str; pool->add_task( - [&mutex, &str](ThreadPoolEngine& pool) + [&mutex, &str](ThreadPool& pool) { { std::lock_guard lock(mutex); -- 2.40.1 ```
Finii commented 7 months ago

This looks really nice!

One thing that could be improved (later) is that it spawns all threads immediately. Maybe switching to a lazy thread creation is worthwhile? Like: You want max 20 threads, and then see how many connections come in, and create the 3 threads that we need on demand. If there are never more than 3 concurrent things to do, why have all the threads idling?

Only reason I can think of is to reduce latency when number of threads increases. But that might be a special case, that can be solved by the min_threads parameter additionally to the num_threads which would be max_threads then.

One could even implement a shrink_to_fit() that ends all unused threads at user will.

Finii commented 7 months ago

/spent 4h30m

Finii commented 7 months ago

:woman_facepalming: My test was broken. You just get, like on a regular future...

But is that a good solution? Maybe there is no other way to check that, technically.

Edit: Yes, I believe the way it is is the only way possible, so :+1:


I have not a way to find out if a Task that does not return anything finished successfully or finished because it threw an exception. Is there a way?

auto task2 = pool.add_task([]() { sleep(1); throw std::runtime_error{"Wanna sleep"}; });

task2.is_complete() turns true and ... how do I find out if it finished ok or with error?

There is the get_id() member - which I would remove as it just needlessly exposes a implementation detail, but there is no way to access the future and so there is no way to check a void task for successful-termination?

Maybe it should be possible to use task2.get_result() which is currently not possible. Or is_complete() needs to be tri-state?

alt-graph commented 7 months ago

Suggested Patch 2, maybe include something similar

Great idea, applied. Oh – CI errors. Will investigate.

alt-graph commented 7 months ago

Great idea, applied. Oh – CI errors. Will investigate.

That immediately found another bug in the trim() example. Fixed.

alt-graph commented 7 months ago

Patch 1: How to make one object out of ThreadPool and ThreadPoolEngine

(Rough, just to show the idea)

The engine_ member must come last of all data member, maybe that needs a comment in the code. It does in the diff but is easily forgotten with time.

I am not entirely sure if I got the gist of your patch, but for me it looks like it boils down to this:

engine_.reset(this, [](ThreadPool*){});

This stores the address of the existing ThreadPool into a shared_ptr, which is fine. But TaskHandle still has no safe way of accessing the ThreadPool object. It can call weak_ptr::expired() to find out if the pool is already gone, but it can not keep it alive via lock() to actually do anything with it. Nothing keeps the original ThreadPool alive unless it was actually allocated inside a shared_ptr, hence the ThreadPoolEngine.

alt-graph commented 7 months ago

One thing that could be improved (later) is that it spawns all threads immediately. Maybe switching to a lazy thread creation is worthwhile? Like: You want max 20 threads, and then see how many connections come in, and create the 3 threads that we need on demand. If there are never more than 3 concurrent things to do, why have all the threads idling?

Only reason I can think of is to reduce latency when number of threads increases. But that might be a special case, that can be solved by the min_threads parameter additionally to the num_threads which would be max_threads then.

One could even implement a shrink_to_fit() that ends all unused threads at user will.

I thought about something like that, too. Adding threads on-the-fly is very easy, but shrinking the pool requires joining the superfluous threads, which can only be done after they finished their work. It gets very messy very soon. I wouldn't discard the idea, but it requires some careful design.

Also, the idling threads do not do any harm except for consuming a few megabytes of memory (and cluttering the view in a debugger). The system scheduler should make sure that they do not consume any CPU cycles because they are blocked on the CV.

Finii commented 7 months ago

BAH, I seem to be unable to answer to comments in a thread if they are direct comments.

ThreadPool and ThreadPoolEngine

I overlooked that :grimacing: (i.e. has no safe way of accessing). I still think it feels clunky.

What do you think about having a factory function instead? Like (after unifying the Engine back into the Pool)

auto my_pool = gul14::get_thread_pool();

// with
auto get_thread_pool() -> std::shared_ptr<ThreadPool>;

At the moment ThreadPool is just a facade to a shared_ptr<ThreadPoolEngine> that forwards all calls. If I have not overlooked something :grimacing: that would be a comparable solution with the same lock-ability, bujt directly and not with an intermediary and no two 'confusing' types.

See also https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#cp61-use-async-to-spawn-concurrent-tasks Edit: The link is not really related, just that it also uses a factory

Lazy thread creation

Well, you seem to think about having 10 threads or something. But the actual limit max_threads is 10'000. I believe that many threads are not a good idea and one should use fibers or similar for such use-cases; less heavy weight mechanisms. I would not play that down (in the arguments) ('idling threads do no harm').

Adding threads on-the-fly is very easy

Then, maybe, we should add that right away? Lazy thread creation does not need to be tied to thread removal, that are two distinct features (in the same direction: Try to reduce unneeded resource use). This does not have to be within this PR. But why not add that, if it is easy?

but shrinking the pool requires joining the superfluous threads, which can only be done after they finished their work

That is why I proposed a shrink_to_fit() that just joins all the threads that are idle at the moment. That should also be rather easy to implement. Set some 'you need to die' flag(s) and wake the thread(s) up, then join them away.

Edit: To be explicit: I believe we should continue without the Lazy stuff and get the PR merged, and possibly continue with the Lazy side of life in a new PR afterwards.

alt-graph commented 7 months ago

What do you think about having a factory function instead? Like (after unifying the Engine back into the Pool)

auto my_pool = gul14::get_thread_pool();

// with
auto get_thread_pool() -> std::shared_ptr<ThreadPool>;

At the moment ThreadPool is just a facade to a shared_ptr<ThreadPoolEngine> that forwards all calls. If I have not overlooked something 😬 that would be a comparable solution with the same lock-ability, bujt directly and not with an intermediary and no two 'confusing' types.

I also find the two types very confusing, and they duplicate too much code, so :+1: for the factory function idea. I'll try and implement that. Warning: I will probably make the ThreadPool constructor private again. :-D

I believe we should continue without the Lazy stuff and get the PR merged, and possibly continue with the Lazy side of life in a new PR afterwards.

I have opened an issue (#80) for that.

alt-graph commented 7 months ago

What do you think about having a factory function instead?

It is now implemented like that. ThreadPoolEngine is gone, we need to call make_thread_pool() instead.

alt-graph commented 7 months ago

Thanks for the review, @Finii! I think that both architecture and usability are much better than in the original version.

What I do not understand is why we have

* `gul14::TaskState` but

* `gul14::ThreadPool::TaskHandle`

The TaskHandle needs to access a private member of the ThreadPool, and keeping it separate leads to a mess of circular dependencies between the two classes. All of that can be solved, but is very messy. The nested class was by far the best solution I found.

TaskState could go anywhere, but since this is a scoped enum, people have to write the full type name a lot:

if (handle.get_state() == ThreadPool::TaskState::complete) ...;
if (handle.get_state() == TaskState::complete) ...;

I just prefer the second one.