capnproto / capnproto-rust

Cap'n Proto for Rust
MIT License
2.04k stars 221 forks source link

How to send generated readers across threads #479

Open f1recracker opened 8 months ago

f1recracker commented 8 months ago

Hello,

Thank you for this project! Also, apologies if this is a duplicate - I've found similar questions but I'm not quite sure if we're solving the same problem.

So I'm writing a program that consists of two threads:

I've been able to make an example of this using rust-native structs, but I just can't seem to figure out how to move to capnproto generated code. Here's a minimal example using the addressbook schema.


struct People {
    id: u32,
    // ...
}

pub fn main() {
    let result_map = std::sync::Arc::new(dashmap::DashMap::<String, People>::new());
    {
        // Thread 1
        let result_map = result_map.clone();
        let mut buffer = vec![];
        {
            let builder = build_address_book();
            serialize_packed::write_message(&mut buffer, &builder).unwrap();
        }
        {
            let reader = serialize_packed::read_message(
                std::io::Cursor::new(buffer), capnp::message::ReaderOptions::new()).unwrap();
            let address_book = reader.get_root::<address_book::Reader>().unwrap();
            let _people = address_book.get_people().unwrap().get(0);
            // How to avoid using a rust-native struct here and use `_people`?
            result_map.insert("foo".into(), People { id: _people.get_id() });

        }
    }
    {
        // Thread 2
        let result_map = result_map.clone();
        std::thread::spawn(move || {
            loop {
                match result_map.get("foo") {
                    Some(person) => {
                        println!("Person: {:?}", person.id)
                    },
                    None => {
                        std::thread::sleep(std::time::Duration::from_secs(1));
                    }
                }
            }
        });
    }
}

Specifically I have two questions:

Thank you!

dwrensha commented 8 months ago

You can't put _people into result_map because the lifetime of _people is scoped to the borrow at

let address_book = reader.get_root::<address_book::Reader>().unwrap();

I don't know of a good way to avoid copying the data in this situation.

One thing you can do is have your shared hashmap store values of type message::TypedBuilder<People::owned, HeapAllocator>. You would fill each such value via set_root(), which would copy the data, but should be fast.

https://github.com/capnproto/capnproto-rust/blob/b85e041a1852cd4b3a7d7860ef4fbe16369c73e9/capnp/src/message.rs#L540-L554

To get this approach to perform optimally, you'll probably also need to adjust the segment size of the allocator given to message::TypedBuilder, so that it's just big enough to store _people. The _people.total_size() method should help with this.

f1recracker commented 8 months ago

Thank you! This was indeed helpful. I also had to wrap the TypedBuilder in an Arc<Mutex<>> to make it Sync. This does have some overhead versus rust-native structs as expected.

I think I have a clearer vision of what I was originally hoping to achieve. I might create a SharedReader for write-once-read-many use-cases, that contains:

This is roughly the api I have in mind:

// Producer 
let builder = build_address_book();
let mut buffer = Vec::new();
serialize_packed::write_message(&mut buffer, &builder).unwrap();
let buffer = Arc::new(buffer);

let person_0_ptr = addressbook::root_ptr().get_people().get(0);
let person_0_ref = SharedReader::new(buffer.clone(), person_0_ptr); // Send + Sync + Clone

let person_1_id_ptr = addressbook::root_ptr().get_people().get(1).get_id();
let person_1_id_ref = SharedReader::new(buffer.clone(), person_1_id_ptr); // Send + Sync + Clone

// Consumer 1
let person = person_0_ref.value();
foo(person.get_name(), person.get_id());

// Consumer 2
let person = person_1_id_ref.value();
...

I have a working implementation albeit without the pointers so clients explicitly need to dereference objects they're interested in. However, from a separation of concern perspective, I'm hoping to set this at the producer level.

I'm going to try see if I can hack something together after looking at your repository.

Also, out of curiosity, do you think this might be a good addition to capnp's API?

tv42 commented 8 months ago

I believe this is a duplicate of #256 (though with maintainer response!).