hibiken / asynq

Simple, reliable, and efficient distributed task queue in Go
MIT License
9.65k stars 696 forks source link

[BUG] Inaccurate timing of scheduled execution #806

Open shaMiaoMiao opened 9 months ago

shaMiaoMiao commented 9 months ago

I use tables in the database to control the number and triggering time of asynchronous scheduled tasks,

but in the actual running process, I found that the mechanism of scheduled triggering does not match the crontab expression I set, such as: 0 10-22/6 ?, Expect to execute tasks at 10am, 16pm, and 22am every day.

But in reality, it was executed every six hours starting from 0:00. What is the possible reason for this?

The following is the code for me to control scheduled tasks through Manager and Worker:

Worker.go

package main

type RedisConf struct {
    Host     string
    Port     int
    Password string
    DB       int
}

var appConfig = flag.String("conf", "../../conf/app.toml", "app config file")

func init() {
    ssm.Init()
}

func loggingMiddleware(h asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        start := time.Now()
        log.Printf("Start processing %q", t.Type())
        err := h.ProcessTask(ctx, t)
        if err != nil {
            return err
        }
        log.Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start))
        return nil
    })
}

func main() {

    flag.Parse()

    boot.MustLoadAppConfig(*appConfig)

    ctx, shutdown := bootstrap.SignalContext()
    defer shutdown()

    bootstrap.MustInit(ctx)

    var redisConf *RedisConf
    if err := conf.Parse("schedule-redis.toml", &redisConf); err != nil {
        fmt.Printf("Failed to load redis config: %s\n", err)
    }

    srv := asynq.NewServer(
        asynq.RedisClientOpt{
            Addr:     redisConf.Host + ":" + strconv.Itoa(redisConf.Port),
            DB:       redisConf.DB,
            Password: redisConf.Password,
        },
        asynq.Config{Concurrency: 20},
    )

    mux := asynq.NewServeMux()
    mux.Use(loggingMiddleware)

    mux.HandleFunc(monitor.ScheduleTaskType, monitor.HandleMonitorTask)

    // start server
    if err := srv.Start(mux); err != nil {
        log.Fatalf("could not start server: %v", err)
    }

    // Wait for termination signal.
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
    for {
        s := <-sigs
        if s == unix.SIGTSTP {
            srv.Shutdown()
            continue
        }
        break
    }

    // Stop worker server.
    srv.Stop()
    shutdown()
}

Manager.go:

package main

type RedisConf struct {
    Host     string
    Port     int
    Password string
    DB       int
}

var appConfig = flag.String("conf", "../../conf/app.toml", "app config file")

func init() {
    ssm.Init()
}

type MysqlBasedConfigProvider struct {
}

func (p *MysqlBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {
    var configs []*asynq.PeriodicTaskConfig
    taskList := mysql.GetTaskList(context.Background())
    for _, cfg := range taskList {
        configs = append(configs, &asynq.PeriodicTaskConfig{Cronspec: cfg.CronSpec,
            Task: asynq.NewTask(cfg.TaskType, []byte(cfg.Payload))})
    }
    return configs, nil
}

func main() {
    flag.Parse()

    boot.MustLoadAppConfig(*appConfig)

    ctx, shutdown := bootstrap.SignalContext()
    defer shutdown()

    bootstrap.MustInit(ctx)

    provider := &MysqlBasedConfigProvider{}

    var redisConf *RedisConf
    if err := conf.Parse("schedule-redis.toml", &redisConf); err != nil {
        fmt.Printf("Failed to load redis config: %s\n", err)
    }

    mgr, _ := asynq.NewPeriodicTaskManager(
        asynq.PeriodicTaskManagerOpts{
            RedisConnOpt: asynq.RedisClientOpt{Addr: redisConf.Host + ":" + strconv.Itoa(redisConf.Port),
                Password: redisConf.Password, DB: redisConf.DB},
            PeriodicTaskConfigProvider: provider,      
            SyncInterval:               5 * time.Second, 
        })
    if err := mgr.Run(); err != nil {
        log.Fatal(err)
    }
    shutdown()
}

Hope to receive help, thank you~

kamikazechaser commented 9 months ago

What exactly is your cron expression? Have you tried 0 10,16,22 * * *?