hibiken / asynq

Simple, reliable, and efficient distributed task queue in Go
MIT License
9.49k stars 688 forks source link

[BUG] asynq.TaskID does not work for periodic tasks #912

Open bayoumymac opened 1 month ago

bayoumymac commented 1 month ago

Describe the bug asynq.TaskID doesn't work for periodic tasks as it can only be passed when setting up the scheduler

To Reproduce setting up a scheduler using the following, does not generate a separate taskId per periodic task enqueued

        scheduler.Register(
        interval,
        asynq.NewTask(task, nil),
        asynq.Queue(queue),
        asynq.TaskID(primitive.NewObjectID().Hex()), // mongodb object id as task id
    )

Expected behavior ability to set taskId for periodic tasks, there are multiple solutions

  1. change the signature of preEnqueueFunc to return *asynq.Task (most flexible, allows to customize all different attributes)
  2. add a scheduler option such as TaskOpts which can be a func with the following signature func() []asynq.Option {} where it gets executed for every task

bonus: have a validation check on options passed to scheduler.Register and fail if option is not supported

Environment (please complete the following information):

Additional context currently this solution works but but definitely is an anti pattern

        scheduler := asynq.NewScheduler(opts, &asynq.SchedulerOpts{
            PreEnqueueFunc: func(task *asynq.Task, opts []asynq.Option) {
                if task != nil {
                    *task = *asynq.NewTask(
                                            task.Type(),
                                            task.Payload(),
                                            asynq.TaskID(primitive.NewObjectID().Hex()), // mongodb object id as task id
                                        )
                }
            },
            PostEnqueueFunc: func(task *asynq.TaskInfo, err error) {
                logger.Debugf("enqueued task %s", task.ID) // logs the correct task id
                if err != nil {
                    logger.Errorf("error while enqueuing task %s", err)
                }
            },
        })