danielpclark / rutie

“The Tie Between Ruby and Rust.”
MIT License
939 stars 62 forks source link

Calling back into Ruby from Rust thread #152

Closed milgner closed 2 years ago

milgner commented 2 years ago

I tried using rutie::Thread::call_with_gvl from my Tokio thread to invoke a rutie Proc. Unfortunately this results in:

[BUG] rb_thread_call_with_gvl() is called by non-ruby thread

Is there any way to tell the VM to wait for the next opportunity to acquire the GVL and then execute some code in the context of the Ruby runtime? Are there any examples / best practices on how to call a Proc asynchronously?

milgner commented 2 years ago

So I managed to find a viable solution. Posting it here in case anyone encounters a similar situation.

The approach is to create a Ruby thread using rutie::Thread::new, and use a mpsc channel to supply it with data. Through the help of rutie::Thread#call_without_gvl we can do a blocking wait on the receiver.

Corresponding code (proof of concept :spaghetti:):

            let callback: Arc<rutie::Proc> = Arc::new(callback);

            let (tx, mut rx) = std::sync::mpsc::channel();
            // representative of an asynchronously-called Rust block
            let handler = move |data_source_id: String, error: String {
                tx.send((data_source_id, error));
            };
            // not 100% sure whether necessary but we're saving the callback for the GC mark phase
            let ruby_callback = callback.clone();
            let ruby_thread = rutie::Thread::new(move || {
                let recv = || { rx.recv() }; // this lambda is blocking but should not block the GVL
                let unblock = || {}; // no idea what should be done here?
                while let Ok((data_source_id, error)) = rutie::Thread::call_without_gvl(recv, Some(unblock)) {
                    // in here, the GVL is locked again and we can interact with the Ruby VM again
                    let data_source_id = RString::new_utf8(data_source_id.as_str());
                    let error = RString::new_utf8(error.as_str());
                    // invoke the Proc
                    callback.call(&[data_source_id.into(), error.into()]);
                }
                NilClass::new() // need a return value for the thread...
            });