unum-cloud / usearch

Fast Open-Source Search & Clustering engine Γ— for Vectors & πŸ”œ Strings Γ— in C++, C, Python, JavaScript, Rust, Java, Objective-C, Swift, C#, GoLang, and Wolfram πŸ”
https://unum-cloud.github.io/usearch/
Apache License 2.0
1.93k stars 109 forks source link

Bug: crash when hardware concurrency is exceeded #389

Closed mbautin closed 2 months ago

mbautin commented 2 months ago

Describe the bug

When attempting to add vectors to the index using a number of threads that execeeds the hardware concurrency as reported by std::thread::hardware_concurrency(), a crash happens. Stack trace from a test (with AddressSanitizer) is available at https://gist.githubusercontent.com/mbautin/80c0d87a0915e7da7b076a055a382b9e/raw

The root cause seems to be an underflow of the availablethreads vector. It is initialized as follows:

        available_threads_.resize(std::thread::hardware_concurrency());
        std::iota(available_threads_.begin(), available_threads_.end(), 0ul);

The crash happens in this function:

    thread_lock_t thread_lock_(std::size_t thread_id) const {
        if (thread_id != any_thread())
            return {*this, thread_id, false};

        available_threads_mutex_.lock();
        thread_id = available_threads_.back();
        available_threads_.pop_back();
        available_threads_mutex_.unlock();
        return {*this, thread_id, true};
    }

It is invoked from the add_ function as follows:

    template <typename scalar_at>
    add_result_t add_(                             //
        vector_key_t key, scalar_at const* vector, //
        std::size_t thread, bool force_vector_copy, cast_t const& cast) {

        if (!multi() && contains(key))
            return add_result_t{}.failed("Duplicate keys not allowed in high-level wrappers");

        // Cast the vector, if needed for compatibility with `metric_`
        thread_lock_t lock = thread_lock_(thread);

The add function is invoked with thread = any_thread().

I see that this whole mechanism is ultimately needed for providing a "context" object corresponding to the thread. The user could enumerate all the threads that will be performing any index insertion or search operations, and pass a 0-based index of that thread as the thread parameter. It could be possible to call the reserve() function to set the correct threads_add and threads_search, but it does not seem intuitive.

inline index_limits_t(std::size_t n, std::size_t t) noexcept : members(n), threads_add(t), threads_search(t) {}

Steps to reproduce

Run a test that performs more simultaneous concurrent indexing operations than the hardware concurrency. Example test: https://gist.githubusercontent.com/mbautin/6ef7cbbc18b818c1ab969be55a74e899/raw (implemented in YugabyteDB's test framework -- should be easy to extract as an independent test).

Expected behavior

The system should not crash. It should allow any number of indexing threads to run.

USearch version

2.10.5

Operating System

AlmaLinux 8.8

Hardware architecture

x86

Which interface are you using?

C++ implementation

Contact Details

mbautin@users.noreply.github.com

Is there an existing issue for this?

Code of Conduct

ashvardanian commented 2 months ago

Just to clarify, @mbautin, does it work fine, if you use index_limits_t to increase the number of threads? If so, that's the intended behavior, but we may want to extend the Multi-Threading code-snippet in the cpp/README.md to show how to use that.

mbautin commented 2 months ago

@ashvardanian unfortunately, simpliy calling reserve does not seem to be enough. Here is the relevant part of my current test case. I am using 9 indexing threads below but running the test on a 8-vcpu VM.

  using namespace unum::usearch;

  // Create a metric and index
  const size_t kDimensions = 96;
  metric_punned_t metric(kDimensions, metric_kind_t::l2sq_k, scalar_kind_t::f32_k);

  // Generate and add vectors to the index
  const size_t kNumVectors = ReleaseVsDebugVsAsanVsTsan(100000, 20000, 15000, 10000);
  const size_t kNumIndexingThreads = 9;

  std::uniform_real_distribution<> uniform_distrib(0, 1);

  std::string index_path;
  {
    TestThreadHolder indexing_thread_holder;
    index_dense_config_t index_config;
    index_config.enable_key_lookups = false;
    index_dense_t index = index_dense_t::make(metric, index_config);
    index.reserve(index_limits_t(kNumVectors, kNumIndexingThreads));
    auto load_start_time = MonoTime::Now();
    CountDownLatch latch(kNumIndexingThreads);
    std::atomic<size_t> num_vectors_inserted{0};
    for (size_t thread_index = 0; thread_index < kNumIndexingThreads; ++thread_index) {
      indexing_thread_holder.AddThreadFunctor(
          [&num_vectors_inserted, &index, &latch, &uniform_distrib]() {
            std::random_device rd;
            size_t vector_id;
            while ((vector_id = num_vectors_inserted.fetch_add(1)) < kNumVectors) {
              auto vec = GenerateRandomVector(kDimensions, uniform_distrib);
              ASSERT_TRUE(index.add(vector_id, vec.data()));
            }
            latch.CountDown();
          });
    }
    latch.Wait();
    auto load_elapsed_usec = (MonoTime::Now() - load_start_time).ToMicroseconds();
    ReportPerf("Indexed", kNumVectors, "vectors", kDimensions, load_elapsed_usec,
               kNumIndexingThreads);

    // Save the index to a file
    index_path = GetTestDataDirectory() + "/hnsw_index.usearch";
    ASSERT_TRUE(index.save(index_path.c_str()));
  }

This produces the same ASAN issue as before: https://gist.githubusercontent.com/mbautin/9dc69a931dc28c60093f60a2247b0a99/raw/83a3aad89d5a3b129d02676c6f546370819f6d01/gistfile1.txt

    thread_lock_t thread_lock_(std::size_t thread_id) const {
        if (thread_id != any_thread())
            return {*this, thread_id, false};

        available_threads_mutex_.lock();
        thread_id = available_threads_.back();  // Crashes here
        available_threads_.pop_back();
        available_threads_mutex_.unlock();
        return {*this, thread_id, true};
    }

This is because availablethreads does not take the configuration passed to reserve() into account:

result.available_threads_.resize(hardware_threads);
ashvardanian commented 2 months ago

Nice catch! I'll ship a patch in a couple of hours πŸ€—

mbautin commented 2 months ago

The following tentative patch made it work for me: https://gist.githubusercontent.com/mbautin/1cf8ebe5ff01442b4a2431d1cc189a8d/raw

ashvardanian commented 2 months ago

:tada: This issue has been resolved in version 2.11.1 :tada:

The release is available on GitHub release

Your semantic-release bot :package::rocket: