SerenityOS / serenity

The Serenity Operating System 🐞
https://serenityos.org
BSD 2-Clause "Simplified" License
29.46k stars 3.14k forks source link

LibCompress(+AK+LibHTTP): Implement streamable asynchronous deflate and zlib decompression #24567

Open DanShaders opened 2 weeks ago

DanShaders commented 2 weeks ago

This PR builds upon previous work in the foundational coroutines PR and implements streamable asynchronous, error-safe, and EOF-correct decompression. Incidentally, new asynchronous implementation is about 2 times faster than our previous synchronous one.

In the future, to address code duplication the PR introduces, I plan to create a AsyncStream -> Stream translation mechanism and use it to reroute old classes to the new implementation.

(Deflate algorithm itself is pretty much directly copied from the old implementation, so it probably doesn't require as much attention as scaffolding.)

@timschumi, I'm sorry for yet another gigantic PR :).

DanShaders commented 2 days ago

@timschumi, here are performance numbers on i9-13900H (that were physically very painful to get because my laptop doesn't provide enough cooling for even a single core to work on max turbo frequency for 1 minute):

testcase time
inflate_async_1gb (factor = 2.0) 3655.9±49.9ms (min=3570ms, max=3719ms, total=36559ms)
inflate_async_1gb (factor = 1.0) 3755.1±48.1ms (min=3675ms, max=3814ms, total=37551ms)
inflate_sync_1gb 6236.7±55.3ms (min=6124ms, max=6294ms, total=62367ms)

I cannot immediately tell what's wrong with your argument against optimization_factor but, empirically, it seems to give a 3% boost out of 2 line diff.

The file I used for benchmarking is some old 1GB wiki dump: https://www.dshpr.com/wiki-dump-1g. Benchmark itself is

Benchmark ```diff diff --git a/Tests/LibCompress/TestAsyncDeflate.cpp b/Tests/LibCompress/TestAsyncDeflate.cpp index c0fd3b798a..92ad4431f3 100644 --- a/Tests/LibCompress/TestAsyncDeflate.cpp +++ b/Tests/LibCompress/TestAsyncDeflate.cpp @@ -4,6 +4,9 @@ * SPDX-License-Identifier: BSD-2-Clause */ +#include "LibCompress/Deflate.h" +#include "LibCore/File.h" +#include "LibCore/System.h" #include #include #include @@ -152,3 +155,89 @@ ASYNC_TEST_CASE(decompression) }); } } + +BENCHMARK_CASE(inflate_sync_1gb) +{ + auto path = "/home/danklishch/code/compression/raw2"sv; + auto test_file = TRY_OR_FAIL(Core::File::open(path, Core::File::OpenMode::Read)); + auto test_data = TRY_OR_FAIL(test_file->read_until_eof()); + auto result = TRY_OR_FAIL(Compress::DeflateDecompressor::decompress_all(test_data)); + EXPECT_EQ(result.size(), 1'000'000'000UZ); +} + +class InputFileStream : public AsyncInputStream { +public: + InputFileStream(int fd) + : m_fd(fd) + { + } + + ~InputFileStream() + { + if (m_is_open) + reset(); + } + + void reset() override + { + VERIFY(m_is_open); + m_is_open = false; + MUST(Core::System::close(m_fd)); + } + + Coroutine> close() override + { + reset(); + co_return {}; + } + + bool is_open() const override { return m_is_open; } + + Coroutine> enqueue_some(Badge) override + { + VERIFY(m_is_open); + size_t nwritten = CO_TRY(co_await m_buffer.enqueue(chunk_size, [&](Bytes bytes) -> Coroutine> { + auto read_result = Core::System::read(m_fd, bytes); + if (read_result.is_error()) + reset(); + co_return read_result; + })); + co_return nwritten != 0; + } + + ReadonlyBytes buffered_data_unchecked(Badge) const override + { + return m_buffer.data(); + } + + void dequeue(Badge, size_t bytes) override + { + m_buffer.dequeue(bytes); + } + +private: + static constexpr int chunk_size = 65536; + + bool m_is_open { true }; + int m_fd { -1 }; + StreamBuffer m_buffer; +}; + +BENCHMARK_CASE(inflate_async_1gb) +{ + Core::run_async_in_new_event_loop([&] -> Coroutine { + auto path = "/home/danklishch/code/compression/raw2"sv; + int fd = CO_TRY_OR_FAIL(Core::System::open(path, O_RDONLY)); + auto input = make(fd); + auto decompressor = Compress::Async::DeflateDecompressor { move(input) }; + size_t nread = 0; + while (true) { + auto [data, is_eof] = CO_TRY_OR_FAIL(co_await decompressor.peek_or_eof()); + if (is_eof) + break; + nread += data.size(); + must_sync(decompressor.read(data.size())); + } + EXPECT_EQ(nread, 1'000'000'000UZ); + }); +} ```