hibiken / asynq

Simple, reliable, and efficient distributed task queue in Go
MIT License
9k stars 659 forks source link

[FEATURE REQUEST] Add callback for task state changes #864

Open hungtcs opened 2 months ago

hungtcs commented 2 months ago

Is your feature request related to a problem? Please describe.

I'm currently working on a feature that notifies other services when a task has finished. Unfortunately, I found that asynq doesn't provide an api for this feature.

I currently do this via middleware, but it has some flaws and doesn't work well.

One of the key issues is that the life cycle of the task does not actually end when the middleware is executed. If another service is notified of the end of the task at this point, however, it may still be active when the other service gets the task status from the inspector.

func TaskFinishEventMiddleware() asynq.MiddlewareFunc {
    return func(h asynq.Handler) asynq.Handler {
        return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
            var taskId string
            var queueName string
            if val, ok := asynq.GetQueueName(ctx); ok {
                queueName = val
            } else {
                return fmt.Errorf("can not get queue name from task context")
            }
            if val, ok := asynq.GetTaskID(ctx); ok {
                taskId = val
            } else {
                return fmt.Errorf("can not get task id from task context")
            }

            var taskErr = h.ProcessTask(ctx, t)

            // Send notifications to other services to notify the end of the task

            return taskErr
        })
    }
}

Describe the solution you'd like

One solution is to add a global callback function that is called when the task state changes.

var server = asynq.NewServer(redisClientOpt, asynq.Config{
    Queues:   queues,
    Logger:   log.Log,
    LogLevel: logLevel,

    // Add new callback 
    TaskStateChange: func(taskInfo *asynq.TaskInfo) {

    },
})

Describe alternatives you've considered

I'm currently using middleware, but it's not working well.