Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safe, shared-state concurrency with WaitGroup #1046

Open
siennathesane opened this issue Dec 6, 2023 · 2 comments
Open

Thread-safe, shared-state concurrency with WaitGroup #1046

siennathesane opened this issue Dec 6, 2023 · 2 comments
Labels

Comments

@siennathesane
Copy link

I have a complex use case for crossbeam_utils::sync::WaitGroup. I have a thread-safe structure that builds SSTables for databases, and it is very large in-memory, so it can't easily be cloned. As part of that, the table builder needs to be able to compress and encrypt the blocks when they're complete, but that work is done asynchronously. I want to use WaitGroup to provide the final sync point before I flush the SSTable to disk.

Here's my current MVCE (playground link).

use bytes::Bytes;
use crossbeam_deque::Worker;
use crossbeam_utils::sync::WaitGroup;
use parking_lot::Mutex;
use std::{sync::Arc, thread, thread::available_parallelism};

/// Block on disk
struct Block {
    data: Bytes, // several megabytes
}

/// Thread-safe SSTable Builder
struct Builder {
    wg: Arc<WaitGroup>,
    done: Arc<Mutex<bool>>,
    blocks: Arc<Mutex<Vec<Block>>>, // could be 100s of MiBs until it's flushed
    work_queue: Arc<Mutex<Worker<usize>>>,
}

impl Builder {
    /// Returns an Arc<Builder> to ensure that cloning doesn't clone hundreds
    /// of megabytes
    pub fn new() -> Arc<Self> {
        let f = Arc::new(Builder {
            wg: Arc::new(WaitGroup::new()),
            done: Arc::new(Mutex::new(false)),
            blocks: Arc::new(Mutex::new(vec![])),
            work_queue: Arc::new(Mutex::new(Worker::<usize>::new_lifo())),
        });

        // spin up the internal data workers
        let p_count = available_parallelism().unwrap().get();
        for _ in 0..=p_count {
            let f_alias = f.clone();
            thread::spawn(move || {
                f_alias.worker();
            });
        }

        f
    }

    pub fn add(&self, _key: Bytes, _value: Bytes) {
        // add to block
    }

    pub fn complete(&self) {
        // inform workers there's no more work
        {
            let mut done = self.done.lock();
            *done = true;
        }

        // ensure the work is complete
        self.wg.wait();

        // flush blocks to disk
    }

    // a worker thread
    fn worker(&self) {
        let wg = self.wg.clone();

        while !*self.done.lock() {
            let stealer = self.work_queue.lock().stealer();
            let idx = stealer.steal().success().unwrap();

            let mut block_list_ref = self.blocks.lock();
            let _block = &mut block_list_ref[idx];

            // compress and encrypt the block stolen from the queue
            // this modifies the block vec in-place
        }

        drop(wg);
    }
}

fn main() {
    // we need a new sstable
    let f = Builder::new();
    
    // some other threads will call this
    // f.add(key, value);
    
    // sstable is complete
    // finalize all blocks
    // write to disk
    f.complete();
}

Because of the size of the data in the blocks, and the overall builder size, moving the data around isn't very feasible, so I mutate and operate on it in-place as much as possible. I pass pointers around, and everything is behind Arc, and most are also locked with mutexes. However, when I tried to introduce WaitGroup, I got this error:

   Compiling playground v0.0.1 (/playground)
error[E0507]: cannot move out of an `Arc`
   --> src/lib.rs:53:9
    |
53  |         self.wg.wait();
    |         ^^^^^^^ ------ value moved due to this method call
    |         |
    |         move occurs because value has type `WaitGroup`, which does not implement the `Copy` trait
    |
note: `WaitGroup::wait` takes ownership of the receiver `self`, which moves value
   --> /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-utils-0.8.16/src/sync/wait_group.rs:103:17
    |
103 |     pub fn wait(self) {
    |                 ^^^^
help: you can `clone` the value and consume it, but this might not be your desired behavior
    |
53  |         self.wg.clone().wait();
    |                ++++++++

For more information about this error, try `rustc --explain E0507`.
error: could not compile `playground` (lib) due to previous error

I've tried several different attemptes with pointers, smart pointers, and mutexes, but I cannot seem to get WaitGroup to meet my needs. It would be great if WaitGroup could support this kind of use case.

@taiki-e
Copy link
Member

taiki-e commented Dec 21, 2023

I guess you want a variant of barrier that does not need to know the number of threads at construction, right?
https://docs.rs/crossbeam-utils/latest/crossbeam_utils/sync/struct.WaitGroup.html#wait-groups-vs-barriers

@siennathesane
Copy link
Author

I guess you want a variant of barrier that does not need to know the number of threads at construction, right?

And is also potentially scalable, as well. WaitGroup allows users to add or remove threads as needed, which makes it useful as a "dumb barrier" of sorts. I can likely use a barrier as a drop-in replacement for now (haven't tried yet), but I wanted to surface this type of use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

2 participants