fzyzcjy / flutter_rust_bridge

Flutter/Dart <-> Rust binding generator, feature-rich, but seamless and simple.
https://fzyzcjy.github.io/flutter_rust_bridge/
MIT License
3.75k stars 270 forks source link

Possible Solution for Async Rust Functions #966

Closed anlumo closed 1 year ago

anlumo commented 1 year ago

Is your feature request related to a problem? Please describe.

According to the documentation Section 3.9, having async functions is currently not possible due to issues with the Rust runtime.

Async functions are very important especially on the Web target, since all I/O is inherently async on the Web. So if you're trying to do anything other than data structures or pure compute, it won't work at all. Section 18.1 does provide some possible solutions, but they all don't work on the Web target, because there's no way to block.

Describe alternatives you've considered

There is a solution that's surprisingly simple. Note that I haven't actually tested it, this is purely based on the documentation.

There's Stream support according to Section 3.3. So, an async function can simply take a StreamSink<T> as the first parameter, then spawn a new task in whatever way the runtime wants (like wasm_bindgen_futures::spawn_local), return immediately, and once the result is available, call .add(T) on the StreamSink and then close the stream.

On the Dart side, the caller can call .single on the returned Stream and use that as the Future.

Describe the solution you'd like

Ideally, this should just work as a solution for async as well:

Couldn't the Stream feature be adjusted so that there's a similar solution for just returning a Future? So take a FutureSink<T> as the first parameter and then whenever you feel like, call .resolve(T) (which automatically closes the internal Stream). On the Dart side, this could be an actual Future that was created from the Stream (preceded with the Future created by the sync function call).

welcome[bot] commented 1 year ago

Hi! Thanks for opening your first issue here! :smile:

fzyzcjy commented 1 year ago

Well, the solution is indeed simpler:

The stream, and normal rust function, indeed supports it naturally. This is because, their implementation looks like:

// normal function
fn generated_function() {
  let ans = your_rust_sync_function_in_api_rs();
  dart_utility_send_data(ans);
}

// stream
impl StreamSink {
  pub fn add(data) { dart_utility_send_data(data); }
}

So, to support Rust async, we already have all things needed:

async fn generated_function() {
  let ans = your_rust_sync_function_in_api_rs().await; // JUST ADD AWAIT
  dart_utility_send_data(ans);
}
fzyzcjy commented 1 year ago

In other words, notice that we never really require the function in api.rs to be synchronous (except if you use SyncReturn) :)

fzyzcjy commented 1 year ago

Feel free to PR and looking forward to this feature!

anlumo commented 1 year ago

Who would be polling the Future in your solution?

fzyzcjy commented 1 year ago

Anyone you like. Maybe you have another function, say:

fn a_function_that_indeed_is_long_running() {
  poll the futures forever
}

and you call it in your setup code of Dart.

Note that, every function is (by default) async in Dart. Thus, even if this function never returns, the dart code is not blocked at all.

Feel free to PR!

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

anlumo commented 1 year ago

👀

stale[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

danielleiszen commented 1 year ago

Hi, sorry for my dumbness, but I do not get it completely by the given example. I have a rust library that I would like to make available for my Flutter app. However this lib is also used by a gRPC service and every single method in it is async, using the Tokio runtime.

So I have an instance that I would like to use and that has async methods.

let instance = my_lazy_mutex.lock().unwrap();
instance.call_some_async().await;

Could you please clarify what do I do exactly to wrap that code segment in my API?

anlumo commented 1 year ago

Here's my actual code, but just one workflow extracted out of it. It's reading a string from the database indexed by a key. I'm using persy on the native side and IndexedDB on the wasm side for permanent storage.

api.rs:

pub struct Preferences {
    pub database: RustOpaque<PreferencesDatabase>,
}

impl Preferences {
    pub fn read(&self, sink: StreamSink<String>, key: String) {
        let database = self.database.clone();
        AsyncAdapter::spawn(async move {
            match database.read(key).await {
                Ok(Some(value)) => {
                    sink.add(value);
                }
                Ok(None) => {}
                Err(e) => {
                    log::error!("{e}");
                }
            }
            sink.close();
        });
    }
}

Native implementation:

pub(crate) struct AsyncAdapter;

impl AsyncAdapter {
    pub(crate) fn spawn_stream<T: IntoDart + 'static>(
        sink: StreamSink<T>,
        f: impl for<'a> FnOnce(&'a StreamSink<T>) -> Pin<Box<dyn Future<Output = ()> + 'a>> + 'static,
    ) -> anyhow::Result<()> {
        futures::executor::block_on(async move {
            f(&sink).await;
            sink.close();
        });

        Ok(())
    }

    pub(crate) fn spawn<F: Future + 'static>(f: F) {
        futures::executor::block_on(f);
    }
}

pub struct PreferencesDatabase {
    persy: AssertUnwindSafe<Persy>,
}

impl PreferencesDatabase {
    pub async fn read(&self, key: String) -> anyhow::Result<Option<String>> {
        let mut tx = self.persy.begin()?;
        let Some(persy_id) = tx.one::<_, PersyId>(INDEX_PREFERENCES_KEY, &key)? else {
            tx.rollback()?;
            return Ok(None);
        };

        let data = tx
            .read(PREFERENCES_SEGMENT, &persy_id)?
            .ok_or(anyhow::anyhow!(
                "Internal database error: inconsistent index for preferences key {key:?}"
            ))?;

        tx.commit()?;

        Ok(String::from_utf8(data).map(Some)?)
    }
}

wasm implementation:

pub(crate) struct AsyncAdapter;

impl AsyncAdapter {
    pub(crate) fn spawn_stream<T: IntoDart + 'static>(
        sink: StreamSink<T>,
        f: impl for<'a> FnOnce(&'a StreamSink<T>) -> Pin<Box<dyn Future<Output = ()> + 'a>> + 'static,
    ) -> anyhow::Result<()> {
        wasm_bindgen_futures::spawn_local(async move {
            f(&sink).await;
            sink.close();
        });

        Ok(())
    }

    pub(crate) fn spawn<F: Future<Output = ()> + 'static>(f: F) {
        wasm_bindgen_futures::spawn_local(f);
    }
}

pub struct PreferencesDatabase {
    #[allow(unused)]
    foo: u8,
}

impl PreferencesDatabase {
    pub async fn read(&self, key: String) -> anyhow::Result<Option<String>> {
        let transaction = map_anyhow(
            Self::database()
                .await?
                .transaction(&["preferences"], TransactionMode::ReadOnly),
        )?;
        let table = map_anyhow_debug(transaction.object_store("preferences"))?;

        let mut keyvalue_request = map_anyhow(table.get(Query::from(JsValue::from(key))))?;

        let (success_sender, mut success_receiver) = futures::channel::oneshot::channel();
        let (error_sender, mut error_receiver) = futures::channel::oneshot::channel();

        keyvalue_request.on_success(move |event| {
            let keyvalue_request = StoreRequest::try_from(event.target().unwrap()).unwrap();
            if let Ok(keyvalue) = keyvalue_request.result() {
                if keyvalue.is_truthy() {
                    if let Some(value) = Reflect::get(&keyvalue, &"value".into())
                        .unwrap()
                        .as_string()
                    {
                        success_sender.send(Some(value)).unwrap();
                        return;
                    }
                }
            }
            success_sender.send(None).unwrap();
        });
        keyvalue_request.on_error(move |event| {
            log::error!("Failed reading from indexeddb: {event:?}");
            error_sender.send(event).ok();
        });

        futures::select! {
            value = success_receiver => {
                Ok(value.unwrap())
            },
            err = error_receiver => {
                log::error!("Failed indexeddb transaction: {err:?}");
                Err(anyhow::anyhow!("{:?}", err.unwrap()))
            }
        }
    }
}
github-actions[bot] commented 1 year ago

This thread has been automatically locked since there has not been any recent activity after it was closed. If you are still experiencing a similar issue, please open a new issue.