RichardKnop / machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.
Mozilla Public License 2.0
7.56k stars 917 forks source link

Issue on multiple workers, task will be duplicated #633

Open xxxyangyu opened 3 years ago

xxxyangyu commented 3 years ago

Configured with localhost redis as broker and backend, Register a task with random errors. If i run multiple workers(count>1) in one go process, the retry failure task will be duplicated for every workers on each retry time. If run the only one worker on multiple go processes, there will not be some issues on that.

func InitServer() {
    taskMetaMap = make(map[string]*TaskMeta)
    cnf := &config.Config{
        DefaultQueue:    "machinery_tasks",
        ResultsExpireIn: 3600,
        Redis: &config.RedisConfig{
            MaxIdle:                3,
            IdleTimeout:            240,
            ReadTimeout:            15,
            WriteTimeout:           15,
            ConnectTimeout:         15,
            NormalTasksPollPeriod:  1000,
            DelayedTasksPollPeriod: 500,
        },
    }
    broker := redisbroker.NewGR(cnf, []string{"localhost:6379"}, 0)
    backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 0)
    lock := eagerlock.New()
    server = machinery.NewServer(cnf, broker, backend, lock)
    log.Info("Success to start workflow server")
}

Here is the simple task with retry_count = 1000

func RandomError() error {
    if rand.Intn(3) == 0 {
        return nil
    } else {
        return errors.New("random err in function")
    }
}
func InitWorker(workerCount int) chan error {
    workerConcurrency := 50
    workers = make([]*machinery.Worker, workerCount)
    errorsChan := make(chan error)
    for i := 0; i < workerCount; i++ {
        workers[i] = server.NewWorker(workerTag+strconv.Itoa(i), workerConcurrency)
        go workers[i].LaunchAsync(errorsChan)
        log.WithFields(logrus.Fields{
            "Tag": workerTag + strconv.Itoa(i),
        }).Info("success to start workflow worker")
    }
    return errorsChan
}

If the workerCount is greater than 1, the delay task will be duplicated. In redis:

task_5fa463c2-2d33-46ad-9999-e7a88de3cdde {"TaskUUID":"task_5fa463c2-2d33-46ad-9999-e7a88de3cdde","TaskName":"randomError","State":"SUCCESS","Results":[],"Error":"","CreatedAt":"2020-12-16T11:57:31.701169Z","TTL":0}

delayed_tasks [{"score":"1.6081224232404291e+18","value":"{\"UUID\":\"task_95cbadbb-3914-4b38-8da5-f999a0e9a5d3\",\"Name\":\"randomError\",\"RoutingKey\":\"machinery_tasks\",\"ETA\":\"2020-12-16T12:40:23.240429Z\",\"GroupUUID\":\"\",\"GroupTaskCount\":0,\"Args\":[],\"Headers\":{},\"Priority\":0,\"Immutable\":false,\"RetryCount\":94,\"RetryTimeout\":13,\"OnSuccess\":null,\"OnError\":null,\"ChordCallback\":null,\"BrokerMessageGroupId\":\"\",\"SQSReceiptHandle\":\"\",\"StopTaskDeletionOnError\":false,\"IgnoreWhenTaskNotRegistered\":false}"},{"value":"{\"UUID\":\"task_95cbadbb-3914-4b38-8da5-f999a0e9a5d3\",\"Name\":\"randomError\",\"RoutingKey\":\"machinery_tasks\",\"ETA\":\"2020-12-16T12:40:23.751703Z\",\"GroupUUID\":\"\",\"GroupTaskCount\":0,\"Args\":[],\"Headers\":{},\"Priority\":0,\"Immutable\":false,\"RetryCount\":94,\"RetryTimeout\":13,\"OnSuccess\":null,\"OnError\":null,\"ChordCallback\":null,\"BrokerMessageGroupId\":\"\",\"SQSReceiptHandle\":\"\",\"StopTaskDeletionOnError\":false,\"IgnoreWhenTaskNotRegistered\":false}","score":"1.608122423751703e+18"},{"score":"1.6081224242632399e+18","value":"{\"UUID\":\"task_95cbadbb-3914-4b38-8da5-f999a0e9a5d3\",\"Name\":\"randomError\",\"RoutingKey\":\"machinery_tasks\",\"ETA\":\"2020-12-16T12:40:24.26324Z\",\"GroupUUID\":\"\",\"GroupTaskCount\":0,\"Args\":[],\"Headers\":{},\"Priority\":0,\"Immutable\":false,\"RetryCount\":94,\"RetryTimeout\":13,\"OnSuccess\":null,\"OnError\":null,\"ChordCallback\":null,\"BrokerMessageGroupId\":\"\",\"SQSReceiptHandle\":\"\",\"StopTaskDeletionOnError\":false,\"IgnoreWhenTaskNotRegistered\":false}"},{"value":"{\"UUID\":\"task_95cbadbb-3914-4b38-8da5-f999a0e9a5d3\",\"Name\":\"randomError\",\"RoutingKey\":\"machinery_tasks\",\"ETA\":\"2020-12-16T12:40:43.952985Z\",\"GroupUUID\":\"\",\"GroupTaskCount\":0,\"Args\":[],\"Headers\":{},\"Priority\":0,\"Immutable\":false,\"RetryCount\":93,\"RetryTimeout\":21,\"OnSuccess\":null,\"OnError\":null,\"ChordCallback\":null,\"BrokerMessageGroupId\":\"\",\"SQSReceiptHandle\":\"\",\"StopTaskDeletionOnError\":false,\"IgnoreWhenTaskNotRegistered\":false}","score":"1.6081224439529851e+18"},{"score":"1.6081224444658529e+18","value":"{\"UUID\":\"task_95cbadbb-3914-4b38-8da5-f999a0e9a5d3\",\"Name\":\"randomError\",\"RoutingKey\":\"machinery_tasks\",\"ETA\":\"2020-12-16T12:40:44.465853Z\",\"GroupUUID\":\"\",\"GroupTaskCount\":0,\"Args\":[],\"Headers\":{},\"Priority\":0,\"Immutable\":false,\"RetryCount\":93,\"RetryTimeout\":21,\"OnSuccess\":null,\"OnError\":null,\"ChordCallback\":null,\"BrokerMessageGroupId\":\"\",\"SQSReceiptHandle\":\"\",\"StopTaskDeletionOnError\":false,\"IgnoreWhenTaskNotRegistered\":false}"},{"value":"{\"UUID\":\"task_95cbadbb-3914-4b38-8da5-f999a0e9a5d3\",\"Name\":\"randomError\",\"RoutingKey\":\"machinery_tasks\",\"ETA\":\"2020-12-16T12:40:44.465914Z\",\"GroupUUID\":\"\",\"GroupTaskCount\":0,\"Args\":[],\"Headers\":{},\"Priority\":0,\"Immutable\":false,\"RetryCount\":93,\"RetryTimeout\":21,\"OnSuccess\":null,\"OnError\":null,\"ChordCallback\":null,\"BrokerMessageGroupId\":\"\",\"SQSReceiptHandle\":\"\",\"StopTaskDeletionOnError\":false,\"IgnoreWhenTaskNotRegistered\":false}","score":"1.6081224444659141e+18"},{"value":"{\"UUID\":\"task_95cbadbb-3914-4b38-8da5-f999a0e9a5d3\",\"Name\":\"randomError\",\"RoutingKey\":\"machinery_tasks\",\"ETA\":\"2020-12-16T12:40:44.978798Z\",\"GroupUUID\":\"\",\"GroupTaskCount\":0,\"Args\":[],\"Headers\":{},\"Priority\":0,\"Immutable\":false,\"RetryCount\":93,\"RetryTimeout\":21,\"OnSuccess\":null,\"OnError\":null,\"ChordCallback\":null,\"BrokerMessageGroupId\":\"\",\"SQSReceiptHandle\":\"\",\"StopTaskDeletionOnError\":false,\"IgnoreWhenTaskNotRegistered\":false}","score":"1.6081224449787981e+18"}]

JinXinAuzn commented 3 years ago

我也遇到此类问题

MikeLing commented 3 years ago

The repeated tasks still exists

MikeLing commented 3 years ago

@liaojie Hi, any time to update this issue? Thank you