fzyzcjy / flutter_rust_bridge

Flutter/Dart <-> Rust binding generator, feature-rich, but seamless and simple.
https://fzyzcjy.github.io/flutter_rust_bridge/
MIT License
4.22k stars 290 forks source link

New version can not sent feature between threads safely #2317

Closed niuhuan closed 3 weeks ago

niuhuan commented 3 weeks ago

flutter_rust_bridge supported async fn is a cool feature. I have tried it in other code repositories and it works very well, but I encountered difficulties upgrading frb in some old code repositories.

I had upgrade flutter_rust_bridge from v1.54.0 to v2.4.0.

Due to the difficulty of description, please refer to my different versions of code.

This is code with v1.54.0, I had make a static RUNTIME and block_on use it, It has run success.


lazy_static! {
    pub(crate) static ref RUNTIME: runtime::Runtime = runtime::Builder::new_multi_thread()
        .enable_all()
        .thread_keep_alive(tokio::time::Duration::new(60, 0))
        .worker_threads(30)
        .max_blocking_threads(30)
        .build()
        .unwrap();
}

fn block_on<T>(f: impl Future<Output = T>) -> T {
    RUNTIME.block_on(f)
}

pub fn comic_categories() -> Result<Vec<ComicCategory>> {
    block_on(web_cache::cache_first(
        "COMIC_CATEGORIES".to_owned(),
        Duration::from_secs(60 * 60 * 10),
        Box::pin(async { CLIENT.read().await.comic_categories().await }),
    ))
}

pub(crate) async fn cache_first<T: for<'de> serde::Deserialize<'de> + serde::Serialize>(
    key: String,
    expire: Duration,
    pin: Pin<Box<dyn Future<Output = anyhow::Result<T>>>>,
) -> anyhow::Result<T> {
    let time = chrono::Local::now().timestamp_millis();
    let db = CACHE_DATABASE.get().unwrap().lock().await;
    let in_db = Entity::find_by_id(key.clone()).one(db.deref()).await?;
    if let Some(ref model) = in_db {
        if time < (model.cache_time + expire.as_millis() as i64) {
            return Ok(serde_json::from_str(&model.cache_content)?);
        }
    };
    let t = pin.await?;
    let content = serde_json::to_string(&t)?;
    if let Some(_) = in_db {
        Entity::update_many()
            .filter(Column::CacheKey.eq(key.clone()))
            .col_expr(Column::CacheTime, Expr::value(time.clone()))
            .col_expr(Column::CacheContent, Expr::value(content.clone()))
            .exec(db.deref())
            .await?;
    } else {
        Model {
            cache_key: key,
            cache_content: content,
            cache_time: time,
        }
        .into_active_model()
        .insert(db.deref())
        .await?;
    }
    Ok(t)
}

I am urgently using the new version of FRB, which has much higher asynchronous efficiency than mine. This is code with v2.4.0, I deleted block_on and RUNTIME,

Wow, The code becomes neat.

pub async fn comic_categories() -> Result<Vec<ComicCategory>> {
    web_cache::cache_first(
        "COMIC_CATEGORIES".to_owned(),
        Duration::from_secs(60 * 60 * 10),
        Box::pin(async { CLIENT.read().await.comic_categories().await }),
    ).await
}

But it can't be compiled. there has logs

error[E0277]: `(dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + 'static)` cannot be sent between threads safely
    --> src/frb_generated.rs:125:33
     |
125  |     FLUTTER_RUST_BRIDGE_HANDLER.wrap_async::<flutter_rust_bridge::for_generated::SseCodec, _, _, _>(
     |                                 ^^^^^^^^^^ `(dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + 'static)` cannot be sent between threads safely
     |
     = help: the trait `std::marker::Send` is not implemented for `(dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + 'static)`, which is required by `{async block@src/frb_generated.rs:142:28: 142:38}: TaskRetFutTrait`
     = note: required for `std::ptr::Unique<(dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + 'static)>` to implement `std::marker::Send`
note: required because it appears within the type `Box<(dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + 'static)>`
    --> /Volumes/DATA/Runtimes/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:233:12
     |
233  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<(dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + 'static)>>`
    --> /Volumes/DATA/Runtimes/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/pin.rs:1089:12
     |
1089 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it's used within this `async` fn body
    --> src/database/cache/web_cache.rs:45:24
     |
45   |   ) -> anyhow::Result<T> {
     |  ________________________^
46   | |     let time = chrono::Local::now().timestamp_millis();
47   | |     let db = CACHE_DATABASE.get().unwrap().lock().await;
48   | |     let in_db = Entity::find_by_id(key.clone()).one(db.deref()).await?;
...    |
73   | |     Ok(t)
74   | | }
     | |_^
note: required because it's used within this `async` fn body
    --> src/api/bridge.rs:265:63
     |
265  |   pub async fn comic_categories() -> Result<Vec<ComicCategory>> {
     |  _______________________________________________________________^
266  | |     web_cache::cache_first(
267  | |         "COMIC_CATEGORIES".to_owned(),
268  | |         Duration::from_secs(60 * 60 * 10),
269  | |         Box::pin(async { CLIENT.read().await.comic_categories().await }),
270  | |     ).await
271  | | }
     | |_^
note: required because it's used within this `async` block
    --> src/frb_generated.rs:144:30
     |
144  |                     (move || async move {
     |                              ^^^^^^^^^^
note: required because it's used within this `async` block
    --> src/frb_generated.rs:142:28
     |
142  |             move |context| async move {
     |                            ^^^^^^^^^^
     = note: required for `{async block@src/frb_generated.rs:142:28: 142:38}` to implement `TaskRetFutTrait`
note: required by a bound in `wrap_async`
    --> /Volumes/DATA/Runtimes/.cargo/registry/src/index.crates.io-6f17d22bba15001f/flutter_rust_bridge-2.4.0/src/handler/handler.rs:56:15
     |
48   |     fn wrap_async<Rust2DartCodec, PrepareFn, TaskFn, TaskRetFut>(
     |        ---------- required by a bound in this associated function
...
56   |             + TaskRetFutTrait,
     |               ^^^^^^^^^^^^^^^ required by this bound in `Handler::wrap_async`

I think the problem lies in the mutex or pin parameter, But I don't know how to modify it, and I have many functions with this problem.

I have tried to wrap each function into a struct and then call a method of it to avoid this problem. But it is a lot of work. I hope someone can tell me the right way.

fzyzcjy commented 3 weeks ago

At first glance, I guess it is because async fn really need to be Send, because it can be executed in multi threads.

So my first guess is, try to ensure things are Send, e.g.

dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + 'static

change to

dyn futures_util::Future<Output = Result<Vec<entities::ComicCategory>, anyhow::Error>> + Send + 'static
niuhuan commented 3 weeks ago

Thanks for your help.

Solving the problem:

codes from pin: Pin<Box<dyn Future<Output=anyhow::Result<T>>>>, to pin: Pin<Box<dyn Future<Output=anyhow::Result<T>> + Send>>,

fzyzcjy commented 3 weeks ago

Happy to see it is solved!

github-actions[bot] commented 1 week ago

This thread has been automatically locked since there has not been any recent activity after it was closed. If you are still experiencing a similar issue, please open a new issue.