crossbeam-rs / crossbeam

Tools for concurrent programming in Rust
Apache License 2.0
7.15k stars 453 forks source link

Thread-safe, shared-state concurrency with `WaitGroup` #1046

Open siennathesane opened 7 months ago

siennathesane commented 7 months ago

I have a complex use case for crossbeam_utils::sync::WaitGroup. I have a thread-safe structure that builds SSTables for databases, and it is very large in-memory, so it can't easily be cloned. As part of that, the table builder needs to be able to compress and encrypt the blocks when they're complete, but that work is done asynchronously. I want to use WaitGroup to provide the final sync point before I flush the SSTable to disk.

Here's my current MVCE (playground link).

use bytes::Bytes;
use crossbeam_deque::Worker;
use crossbeam_utils::sync::WaitGroup;
use parking_lot::Mutex;
use std::{sync::Arc, thread, thread::available_parallelism};

/// Block on disk
struct Block {
    data: Bytes, // several megabytes
}

/// Thread-safe SSTable Builder
struct Builder {
    wg: Arc<WaitGroup>,
    done: Arc<Mutex<bool>>,
    blocks: Arc<Mutex<Vec<Block>>>, // could be 100s of MiBs until it's flushed
    work_queue: Arc<Mutex<Worker<usize>>>,
}

impl Builder {
    /// Returns an Arc<Builder> to ensure that cloning doesn't clone hundreds
    /// of megabytes
    pub fn new() -> Arc<Self> {
        let f = Arc::new(Builder {
            wg: Arc::new(WaitGroup::new()),
            done: Arc::new(Mutex::new(false)),
            blocks: Arc::new(Mutex::new(vec![])),
            work_queue: Arc::new(Mutex::new(Worker::<usize>::new_lifo())),
        });

        // spin up the internal data workers
        let p_count = available_parallelism().unwrap().get();
        for _ in 0..=p_count {
            let f_alias = f.clone();
            thread::spawn(move || {
                f_alias.worker();
            });
        }

        f
    }

    pub fn add(&self, _key: Bytes, _value: Bytes) {
        // add to block
    }

    pub fn complete(&self) {
        // inform workers there's no more work
        {
            let mut done = self.done.lock();
            *done = true;
        }

        // ensure the work is complete
        self.wg.wait();

        // flush blocks to disk
    }

    // a worker thread
    fn worker(&self) {
        let wg = self.wg.clone();

        while !*self.done.lock() {
            let stealer = self.work_queue.lock().stealer();
            let idx = stealer.steal().success().unwrap();

            let mut block_list_ref = self.blocks.lock();
            let _block = &mut block_list_ref[idx];

            // compress and encrypt the block stolen from the queue
            // this modifies the block vec in-place
        }

        drop(wg);
    }
}

fn main() {
    // we need a new sstable
    let f = Builder::new();

    // some other threads will call this
    // f.add(key, value);

    // sstable is complete
    // finalize all blocks
    // write to disk
    f.complete();
}

Because of the size of the data in the blocks, and the overall builder size, moving the data around isn't very feasible, so I mutate and operate on it in-place as much as possible. I pass pointers around, and everything is behind Arc, and most are also locked with mutexes. However, when I tried to introduce WaitGroup, I got this error:

   Compiling playground v0.0.1 (/playground)
error[E0507]: cannot move out of an `Arc`
   --> src/lib.rs:53:9
    |
53  |         self.wg.wait();
    |         ^^^^^^^ ------ value moved due to this method call
    |         |
    |         move occurs because value has type `WaitGroup`, which does not implement the `Copy` trait
    |
note: `WaitGroup::wait` takes ownership of the receiver `self`, which moves value
   --> /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-utils-0.8.16/src/sync/wait_group.rs:103:17
    |
103 |     pub fn wait(self) {
    |                 ^^^^
help: you can `clone` the value and consume it, but this might not be your desired behavior
    |
53  |         self.wg.clone().wait();
    |                ++++++++

For more information about this error, try `rustc --explain E0507`.
error: could not compile `playground` (lib) due to previous error

I've tried several different attemptes with pointers, smart pointers, and mutexes, but I cannot seem to get WaitGroup to meet my needs. It would be great if WaitGroup could support this kind of use case.

taiki-e commented 7 months ago

I guess you want a variant of barrier that does not need to know the number of threads at construction, right? https://docs.rs/crossbeam-utils/latest/crossbeam_utils/sync/struct.WaitGroup.html#wait-groups-vs-barriers

siennathesane commented 7 months ago

I guess you want a variant of barrier that does not need to know the number of threads at construction, right?

And is also potentially scalable, as well. WaitGroup allows users to add or remove threads as needed, which makes it useful as a "dumb barrier" of sorts. I can likely use a barrier as a drop-in replacement for now (haven't tried yet), but I wanted to surface this type of use case.