mvniekerk / tokio-cron-scheduler

Schedule tasks on Tokio using cron-like annotation
Apache License 2.0
473 stars 58 forks source link

Can't run an async function that operate MySQL via Connection Pool. #8

Closed rts-gordon closed 2 years ago

rts-gordon commented 2 years ago

Hi @mvniekerk, Thank you for offering the wonderful project "tokio-cron-scheduler". I use tokio-cron-scheduler run an async function to write MySQL, but there are some errors, would you like to have a look at those, thank you very much.

Code:

#[tokio::main]
async fn main()  {
    let db_pool = connect("mysql://root:123456@localhost:3306", 10, 30).await.unwrap();

    let mut sched = JobScheduler::new();
    let job_async = Job::new_async("0 */1 * * * * *", |_uuid, _l| Box::pin(async move {
            cron_sync_ohlc_to_db(&db_pool).await.unwrap_or(());
        })).unwrap();
    sched.add(job_async).unwrap();
    tokio::spawn(sched.start());

    let server = TcpListener::bind("127.0.0.1:3000").await.unwrap();
    println!("Sync listening on: 127.0.0.1:3000");

    loop {
            server.accept().await.unwrap();
    }
}

pub async fn connect(url: &str, min_conn: u32, max_conn: u32) -> Result<MySqlPool, sqlx::Error> {
    println!("Connecting to MySql...");

    let pool = MySqlPoolOptions::new()
    .min_connections(min_conn)
    .max_connections(max_conn)
    .connect(url).await?;

    Ok(pool)
}

pub async fn cron_sync_ohlc_to_db(
    db_pool: &MySqlPool,
) -> anyhow::Result<()> {
    println!("Begin to cron sync ohlc data to databse...");

    let mut tx = db_pool.begin().await.unwrap();
    let sql_string = "INSERT INTO test1 (u,a,b,c,d) VALUES (1613347800, 1, 2, 3, 4)";
    let sql_query = sqlx::query(&sql_string)
        .execute(&mut tx)
        .await?
        .last_insert_id();

    tx.commit().await.unwrap();

    Ok(())
}

Compile errors:

 `dyn Future<Output = Result<bool, sqlx::Error>> + Send` cannot be shared between threads safely

More details:

error[E0277]: `dyn Future<Output = Result<bool, sqlx::Error>> + Send` cannot be shared between threads safely
  --> src/main.rs:24:9
   |
24 | /         Box::pin(async move {
25 | |             cron_sync_ohlc_to_db(&db_pool).await.unwrap_or(());
26 | |         })
   | |__________^ `dyn Future<Output = Result<bool, sqlx::Error>> + Send` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `dyn Future<Output = Result<bool, sqlx::Error>> + Send`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dyn Future<Output = Result<bool, sqlx::Error>> + Send>`
   = note: required because it appears within the type `Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>`
   = note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>>`      
   = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13> {ResumeTy, pool::connection::Floating<'r, pool::connection::Idle<MySql>>, &'s PoolOptions<MySql>, impl Future, (), &'t2 mut pool::connection::Floating<'t3, pool::connection::Idle<MySql>>, impl Future, Result<(), sqlx::Error>, sqlx::Error, PoolOptions<MySql>, std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't7)>>, &'t8 std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't9)>>, &'t10 Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't11)>, pool::connection::Idle<MySql>, pool::connection::Live<MySql>, MySqlConnection, &'t12 mut MySqlConnection, Pin<Box<(dyn Future<Output = Result<bool, sqlx::Error>> + Send + 't13)>>, Result<bool, sqlx::Error>, bool}`
   = note: required because it appears within the type `[static generator@pool::inner::check_conn<'_, '<empty>, MySql>::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13> {ResumeTy, pool::connection::Floating<'r, pool::connection::Idle<MySql>>, &'s PoolOptions<MySql>, impl Future, (), &'t2 mut pool::connection::Floating<'t3, pool::connection::Idle<MySql>>, impl Future, Result<(), sqlx::Error>, sqlx::Error, PoolOptions<MySql>, std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't7)>>, &'t8 std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't9)>>, &'t10 Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't11)>, pool::connection::Idle<MySql>, pool::connection::Live<MySql>, MySqlConnection, &'t12 mut MySqlConnection, Pin<Box<(dyn Future<Output = Result<bool, sqlx::Error>> + Send + 't13)>>, Result<bool, sqlx::Error>, bool}]`
   = note: required because it appears within the type `from_generator::GenFuture<[static generator@pool::inner::check_conn<'_, '<empty>, MySql>::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13> {ResumeTy, pool::connection::Floating<'r, pool::connection::Idle<MySql>>, &'s PoolOptions<MySql>, impl Future, (), &'t2 mut pool::connection::Floating<'t3, pool::connection::Idle<MySql>>, impl Future, Result<(), sqlx::Error>, sqlx::Error, PoolOptions<MySql>, std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> 
+ Send>> + Send + Sync + 't7)>>, &'t8 std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't9)>>, &'t10 Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't11)>, pool::connection::Idle<MySql>, pool::connection::Live<MySql>, MySqlConnection, &'t12 mut MySqlConnection, Pin<Box<(dyn Future<Output = Result<bool, sqlx::Error>> + Send + 't13)>>, Result<bool, sqlx::Error>, bool}]>`
mvniekerk commented 2 years ago

Hi @CHCP Thanks for the bug report. Can you perhaps provide an example repository that replicates your issue, that I can maybe look into it?

rts-gordon commented 2 years ago

@mvniekerk Thanks for your reply.

The source code is simple, there is the repository, it can't be compiled because of errors.

scheduler-test

mvniekerk commented 2 years ago

I had a quick look. Seems like the linked list implementation used somewhere in the sqlx crate was not marked as Sync / Send... This is bad news for your implementation. I also need to have something similar (cron job + sql commands). Will let you know on this .

rts-gordon commented 2 years ago

Hi @mvniekerk May I know the progress of this issue? Thanks for you help.

saabye-io commented 2 years ago

Hi I have the same issue, I think. It's just with the tokio_modbus Client:

future cannot be shared between threads safely
the trait `Sync` is not implemented for `(dyn tokio_modbus::prelude::Client + 'static)`

Any work around is appreciated.

saabye-io commented 2 years ago

An easy workaround is to send a message in the job, using mpsc:

    sched.add(Job::new_async("1/10 * * * * *", move |_uuid, _l| {
        let tx = tx.clone();
        Box::pin(async move {
            tx.send(1).await.unwrap();
        }
    })}).unwrap()).unwrap();

And then do the action in another thread:

    let (tx, mut rx) =  mpsc::channel::<u8>(1);
    tokio::spawn(async move {
        loop {
            for _ in rx.recv().await {
                do_whatever_wasnt_possible_before().await;
            }
        }
    });

This works for me at least.

rts-gordon commented 2 years ago

It is a possible way, I will try this. Thanks for sharing code.

mvniekerk commented 2 years ago

Thank you @saabye-io - that's about the only solution right now is if you "escape" out of sync+send land into non-sync+send land using mpsc. I'm going to close this ticket @CHCP because there's not much I can do. Thank you for the bug report - I'm sure this is a gotcha that others will also encounter.