mxmlnkn / rapidgzip

Gzip Decompression and Random Access for Modern Multi-Core Machines
Apache License 2.0
344 stars 7 forks source link

Dynamic thread assignment #31

Open marekkokot opened 6 months ago

marekkokot commented 6 months ago

The question: Is there a way to change the number of threads used after initialization?

The story behind the question: Let's assume I have a program that reads from a large input gz file. The decompressed content could be processed in parallel. Using the standard zlib, parallelization is limited by the decompression bandwidth. Using rapidgzip, I could improve decompression bandwidth, but the problem is how I assign threads. I would like to adjust the number of threads the decompressor uses dynamically. In this scenario, I would assign all threads for decompression at the beginning, and then I would start using some of them to process the decompressed data. Depending on the amount of the data already decompressed, I would dynamically re-assign threads to the producer (decompressor) and consumer.

mxmlnkn commented 6 months ago

This is currently not possible and might require some refactoring to do right. Is it important that threads are closed? If this is just about producer/consumer matching, then rapidgzip should effectively do less work on the threads after some time if the consumer is too slow. If you have N threads and you are reading chunk I, then the prefetcher will at maximum, for sequential access, only dispatch chunks up to I+N to the thread pool. So, if you are reading only one chunk per second, then only one chunk per second will be dispatched to the thread pool. If the decompression of that chunk takes less than a second, then you effectively will only use one thread of the thread pool.

marekkokot commented 6 months ago

Thanks for the fast response! It's not important to close threads; hanging them is fine. Is there a way to check the number of active rapidgzip threads? I will probably need to experiment with this a little. My general goal is not to exceed the given number of used threads but to use as many of them as possible. So, for example, if my consumer is too slow, I will use more consumers. As I understand, when I add a new customer, the rapidgzip will also start to use more threads. Thank you again for your response.

mxmlnkn commented 6 months ago

Is there a way to check the number of active rapidgzip threads?

Not yet. You can add it, e.g., like this:

diff --git a/src/core/BlockFetcher.hpp b/src/core/BlockFetcher.hpp
index 448b1cca..0f4c0bed 100644
--- a/src/core/BlockFetcher.hpp
+++ b/src/core/BlockFetcher.hpp
@@ -570,6 +570,12 @@ private:
         return resultFuture;
     }

+    [[nodiscard]] size_t
+    busyThreadCount() const
+    {
+        return m_threadPool.busyThreadCount();
+    }
+
 protected:
     [[nodiscard]] virtual BlockData
     decodeBlock( size_t blockOffset,
diff --git a/src/core/ThreadPool.hpp b/src/core/ThreadPool.hpp
index 3ac2d537..ff6c6414 100644
--- a/src/core/ThreadPool.hpp
+++ b/src/core/ThreadPool.hpp
@@ -178,6 +178,13 @@ public:
                                 [] ( size_t sum, const auto& tasks ) { return sum + tasks.second.size(); } );
     }

+    [[nodiscard]] size_t
+    busyThreadCount() const
+    {
+        const std::lock_guard lock( m_mutex );
+        return m_threadCount - m_idleThreadCount;
+    }
+
 private:
     /**
      * Does not lock! Therefore it is a private method that should only be called with a lock.
diff --git a/src/rapidgzip/ParallelGzipReader.hpp b/src/rapidgzip/ParallelGzipReader.hpp
index ac286d5d..ae13d5d8 100644
--- a/src/rapidgzip/ParallelGzipReader.hpp
+++ b/src/rapidgzip/ParallelGzipReader.hpp
@@ -933,6 +933,12 @@ public:
         m_deflateStreamCRC32s.insert_or_assign( endOfStreamOffsetInBytes, crc32 );
     }

+    [[nodiscard]] size_t
+    busyThreadCount() const
+    {
+        return m_chunkFetcher ? m_chunkFetcher->busyThreadCount() : 0;
+    }
+
 private:
     BlockFinder&
     blockFinder()

As I understand, when I add a new customer, the rapidgzip will also start to use more threads.

Yes.