mvniekerk / tokio-cron-scheduler

Schedule tasks on Tokio using cron-like annotation
Apache License 2.0
529 stars 59 forks source link

The features "signal" didn't work when combined with Axum #66

Open linyihai opened 6 months ago

linyihai commented 6 months ago

I use tokio-cron-scheduler to run schedule job, and run together with Axum

#[tokio::main]
async fn main() {
    // Build our application with a single route.
    let ctx = Context::new();
    let app = axum::Router::new()
        .route("/", axum::routing::get(|| async move { "Hello, World!" }))
        .route("/metrics", axum::routing::get(metrics::test_metrics))
        .layer(ServiceBuilder::new().layer(Extension(ctx.test_metrics())));

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3030").await.unwrap();
    scheduler::schedule(&ctx).await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

I can't use ctrl + c to stop the running process.

linyihai commented 6 months ago

But this example code, it can be stopped by ctrl + c

use std::time::Duration;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};

#[tokio::main]
async fn main() -> Result<(), JobSchedulerError> {
    let mut sched = JobScheduler::new().await?;

    // Add basic cron job
    sched.add(
        Job::new("1/10 * * * * *", |_uuid, _l| {
            println!("I run every 10 seconds");
        })?
    ).await?;

    // Add async job
    sched.add(
        Job::new_async("1/7 * * * * *", |uuid, mut l| {
            Box::pin(async move {
                println!("I run async every 7 seconds");

                // Query the next execution time for this job
                let next_tick = l.next_tick_for_job(uuid).await;
                match next_tick {
                    Ok(Some(ts)) => println!("Next time for 7s job is {:?}", ts),
                    _ => println!("Could not get next tick for 7s job"),
                }
            })
        })?
    ).await?;

    // Add one-shot job with given duration
    sched.add(
        Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
            println!("I only run once");
        })?
    ).await?;

    // Create repeated job with given duration, make it mutable to edit it afterwards
    let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
        println!("I run repeatedly every 8 seconds");
    })?;

    // Add actions to be executed when the jobs starts/stop etc.
    jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was started, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was completed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was removed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;
    sched.add(jj).await?;

    // Feature 'signal' must be enabled
    sched.shutdown_on_ctrl_c();

    // Add code to be run during/after shutdown
    sched.set_shutdown_handler(Box::new(|| {
        Box::pin(async move {
            println!("Shut down done");
        })
    }));

    // Start the scheduler
    sched.start().await?;

    // Wait while the jobs run
    tokio::time::sleep(Duration::from_secs(100)).await;

    Ok(())
}
mvniekerk commented 3 months ago

Hi @linyihai I'm wondering, seeing that Axum uses Tokio, if they're also using the ctrl+c adapter we're using? In that Tokio must handle both?

linyihai commented 3 months ago

I'm wondering, seeing that Axum uses Tokio, if they're also using the ctrl+c adapter we're using? In that Tokio must handle both?

In my previous code, I didn't register the CTRL+C signal for Axum server instance, which prevents the task from being terminated by CTRL+C.

The smallest example that can be reproduced is as follows

use std::time::Duration;
use tokio::signal;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};

#[tokio::main]
async fn main() {
    // Example of using tokio-cron-scheduler
    schedule().await.unwrap();
    // Axum example, only reply ping with pong.
    let app = axum::Router::new().route("/ping", axum::routing::get(|| async move { "pong" }));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3030").await.unwrap();
    axum::serve(listener, app)
        // Commenting this line prevents the task from being terminated by CTRL+C
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

pub async fn schedule() -> Result<(), JobSchedulerError> {
    let mut sched = JobScheduler::new().await?;

    // Add basic cron job
    sched
        .add(Job::new("1/10 * * * * *", |_uuid, _l| {
            println!("I run every 10 seconds");
        })?)
        .await?;

    // Add async job
    sched
        .add(Job::new_async("1/7 * * * * *", |uuid, mut l| {
            Box::pin(async move {
                println!("I run async every 7 seconds");

                // Query the next execution time for this job
                let next_tick = l.next_tick_for_job(uuid).await;
                match next_tick {
                    Ok(Some(ts)) => println!("Next time for 7s job is {:?}", ts),
                    _ => println!("Could not get next tick for 7s job"),
                }
            })
        })?)
        .await?;

    // Add one-shot job with given duration
    sched
        .add(Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
            println!("I only run once");
        })?)
        .await?;

    // Create repeated job with given duration, make it mutable to edit it afterwards
    let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
        println!("I run repeatedly every 8 seconds");
    })?;

    // Add actions to be executed when the jobs starts/stop etc.
    jj.on_start_notification_add(
        &sched,
        Box::new(|job_id, notification_id, type_of_notification| {
            Box::pin(async move {
                println!(
                    "Job {:?} was started, notification {:?} ran ({:?})",
                    job_id, notification_id, type_of_notification
                );
            })
        }),
    )
    .await?;

    jj.on_stop_notification_add(
        &sched,
        Box::new(|job_id, notification_id, type_of_notification| {
            Box::pin(async move {
                println!(
                    "Job {:?} was completed, notification {:?} ran ({:?})",
                    job_id, notification_id, type_of_notification
                );
            })
        }),
    )
    .await?;

    jj.on_removed_notification_add(
        &sched,
        Box::new(|job_id, notification_id, type_of_notification| {
            Box::pin(async move {
                println!(
                    "Job {:?} was removed, notification {:?} ran ({:?})",
                    job_id, notification_id, type_of_notification
                );
            })
        }),
    )
    .await?;
    sched.add(jj).await?;

    // // Feature 'signal' must be enabled
    sched.shutdown_on_ctrl_c();

    // Add code to be run during/after shutdown
    sched.set_shutdown_handler(Box::new(|| {
        Box::pin(async move {
            println!("Shut down done");
        })
    }));

    // Start the scheduler
    sched.start().await?;

    Ok(())
}
async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => { println!("shutdown_signal");},
        _ = terminate => {println!("terminating");},
    }
}

If you uncomment this line, the problem is solved

.with_graceful_shutdown(shutdown_signal())

it seems that if sched.shutdown_on_ctrl_c(); used then the axum must use it together.