Open leissen opened 3 years ago
It happened to me too
@james-deee @rickfish Since you use the mysql-persistence for storing executions, could you please help reproduce/debug this issue? Thanks.
I use postgres-persistence but I can see if it happens on our system
@leissen what does your worker do with the task when it gets it? Does it set the task status? Does it set callbackAfterSeconds?
@rickfish this is my code in worker. I just use this sample to reproduce this issue. Thanks.
@Override
public TaskResult execute(Task task) {
TaskResult result = new TaskResult();
String a = task.getInputData().get("inputA").toString();
String b = task.getInputData().get("inputB").toString();
boolean isSuccess = a.equalsIgnoreCase("a/true");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (isSuccess) {
LOGGER.info("a task execute success!");
result.setStatus(TaskResult.Status.COMPLETED);
} else {
LOGGER.info("a task execute fail!");
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
}
if (result.getStatus() == TaskResult.Status.COMPLETED) {
result.getOutputData().put("a", "a");
LOGGER.info("a task execute success!:" + task.getTaskId());
} else {
LOGGER.info("a task execute fail!:" + task.getTaskId());
}
return result;
}
We also run conductor on top of mysql and have experienced this issue. Anyone working on this issue?
@kartaa mysql is a community contributed module and looks like the original contributors are not active in maintaining the module. This issue is not actively looked into.
We also run conductor on top of mysql and have experienced this issue. Anyone working on this issue? I'm also experiencing this problem using mysql. Did you solve it by any chance? @kartaa
Just curious if anyone also is trying to make use of Redis/Zookeeper Locking too? I'm not sure if that's a solution, but am wondering if anyone hitting this issue is also using Locking as well.
We also run conductor on top of mysql and have experienced this issue. Anyone working on this issue? I'm also experiencing this problem using mysql. Did you solve it by any chance? @kartaa
No we have not worked on a solution for this issue.
Describe the bug Different worker threads poll the same duplicated task message. In other words, the same task message could be polled twice by different worker threads sometimes and I can see the following logs from conductor server.
[INFO ] 2021-05-27 17:57:24,154 MySQLExecutionDAO - Task execution count for TaskA: limit=100000, current=111 [INFO ] 2021-05-27 17:57:24,163 MySQLExecutionDAO - Task execution count for TaskA: limit=100000, current=111 [INFO ] 2021-05-27 17:57:25,429 WorkflowExecutor - Task: 637fa346-fca2-4aaa-b102-99b88797d530 has already finished execution with status: COMPLETED within workflow: 2f1d2764-845e-4031-8e37-75fe5fa1d4d3. Removed task from queue: TaskA [INFO ] 2021-05-27 17:57:25,792 MySQLExecutionDAO - Task execution count for TaskA: limit=100000, current=114 [INFO ] 2021-05-27 17:57:26,288 MySQLExecutionDAO - Task execution count for TaskA: limit=100000, current=115
I have only one conductor server instance using MySQL as database persistence and 4 worker processes with 100 threads for each. I submit 300 workflow instances and would meet this issue after some time when the workers process the tasks. If I have only 1 worker process running, there is no such issue. It seems that the same task message was consumed repeatedly by different worker processes.
I only define one task and one workflow including only one this task.
Task: [{ "name": "TASKA", "retryCount": 0, "timeoutSeconds": 86400, "inputKeys": [ "inputA", "inputB" ], "outputKeys": [ "outputA", "outputB" ], "timeoutPolicy": "TIME_OUT_WF", "retryLogic": "FIXED", "retryDelaySeconds": 1, "responseTimeoutSeconds": 86400, "concurrentExecLimit": 100000, "inputTemplate": {}, "rateLimitPerFrequency": 50, "rateLimitFrequencyInSeconds": 60, "ownerEmail": "encode_admin@test.com", "pollTimeoutSeconds": 86400 }]
Workflow:
{ "name": "my_workflow", "description": "workflow test", "version": 1, "tasks": [ { "name": "TaskA", "taskReferenceName": "task1", "type": "SIMPLE", "inputParameters": { "inputA":"${workflow.input.inputA}", "inputB":"${workflow.input.inputB}" } } ], "outputParameters": { }, "restartable": true, "workflowStatusListenerEnabled": true, "ownerEmail": "admin@google.com", "schemaVersion": 2 }