Netflix / conductor

Conductor is a microservices orchestration engine.
Apache License 2.0
12.83k stars 2.33k forks source link

Making JOIN Task Async failing some workflows #3528

Closed vamsibandarupalli closed 1 year ago

vamsibandarupalli commented 1 year ago

Describe the bug After making JOIN task async some workflows are failing to execute successfully. JOIN task is failing with timeout. If workflow have DECISION tasks in FORK_JOIN branches JOIN whole workflow is failing cause JOIN task is failing with responseTimeOut error. In the below provided workflow "mock_task" Task definition has 300 seconds timeout but failing even before JOIN starts.

Details Conductor version: main branch Persistence implementation: Dynomite Queue implementation: Dynoqueues Lock: Redis Workflow definition: { "createTime": 1678401621130, "updateTime": 1678308058828, "accessPolicy": {}, "name": "fd-csd-flow_mock2", "description": "fd-csd-flow", "version": 1, "tasks": [ { "name": "mock_task", "taskReferenceName": "InputJQ", "inputParameters": { "queryExpression": ".JSONJQINPUT | {filters:[.filters[] | {repository :.repository, \"filters\":[.fields[] | {(.name):.values}] | add}],\"filterflag\": (.filters | length > 0)}", "JSONJQINPUT": "${workflow.input.body}" }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "fork-join-0", "inputParameters": {}, "type": "FORK_JOIN", "forkTasks": [ [ { "name": "mock_task", "taskReferenceName": "ARSCInput", "inputParameters": { "queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == \"arsc\") | .filters else {} end", "JSONJQINPUT": "${InputJQ.output.result}" }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "ARSCDecision", "inputParameters": { "filter": "${InputJQ.output.result.filters}", "filterflag": "${InputJQ.output.result.filterflag}" }, "type": "DECISION", "caseExpression": "if (!$.filterflag ){'arsc';}else {for(var i = 0; i <$.filter.length; i++) { if ($.filter[i].repository == 'arsc'){ 'arsc'; break; }}}", "decisionCases": { "arsc": [ { "name": "mock_task", "taskReferenceName": "ARSCTask", "inputParameters": { "http_request": { "uri": "http://httpbin.org/anything", "method": "POST", "body": "${workflow.input.body}" } }, "type": "HTTP", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "ARSCJQT", "inputParameters": { "queryExpression": ".JSONJQINPUT | .", "JSONJQINPUT": { "objectType": "arsc", "response": "${ARSCTask.output.response.body}" } }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false } ] }, "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "ARSCDecision_join", "inputParameters": { "output": "${ARSCJQT.output.result}", "ARSCTaskOutput": "${ARSCTask.output.response.body}" }, "type": "SIMPLE", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "ARSCFilter", "inputParameters": { "queryExpression": ".JSONJQINPUT | .", "JSONJQINPUT": "${ARSCDecision_join.output.response.body.output}" }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false } ], [ { "name": "mock_task", "taskReferenceName": "OEEHInput", "inputParameters": { "queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == \"oeeh\") | .filters else {} end", "JSONJQINPUT": "${InputJQ.output.result}" }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "OEEHDecision", "inputParameters": { "filter": "${InputJQ.output.result.filters}", "filterflag": "${InputJQ.output.result.filterflag}" }, "type": "DECISION", "caseExpression": "if (!$.filterflag ){\n\t'oeeh';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'oeeh'){ \n 'oeeh';\n break;\n }\n}\n}", "decisionCases": { "oeeh": [ { "name": "mock_task", "taskReferenceName": "OEEHTask", "inputParameters": { "http_request": { "uri": "http://httpbin.org/anything", "method": "POST", "body": "${workflow.input.body}" } }, "type": "HTTP", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "OEEHJQT", "inputParameters": { "queryExpression": ".JSONJQINPUT | .", "JSONJQINPUT": { "objectType": "oeeh", "response": "${OEEHTask.output.response.body}" } }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false } ] }, "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "OEEHDecision_join", "inputParameters": { "output": "${OEEHJQT.output.result}", "OEEHTaskOutput": "${OEEHTask.output.response.body}" }, "type": "SIMPLE", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "OEEHFilter", "inputParameters": { "queryExpression": ".JSONJQINPUT | .", "JSONJQINPUT": "${OEEHDecision_join.output.response.body.output}" }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false } ], [ { "name": "mock_task", "taskReferenceName": "APSVInput", "inputParameters": { "queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == \"apsv\") | .filters else {} end", "JSONJQINPUT": "${InputJQ.output.result}" }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "APSVDecision", "inputParameters": { "filter": "${InputJQ.output.result.filters}", "filterflag": "${InputJQ.output.result.filterflag}" }, "type": "DECISION", "caseExpression": "if (!$.filterflag ){\n\t'apsv';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'apsv'){ \n 'apsv';\n break;\n }\n}\n}", "decisionCases": { "apsv": [ { "name": "mock_task", "taskReferenceName": "APSVTask", "inputParameters": { "http_request": { "uri": "http://httpbin.org/anything", "method": "POST", "body": "${workflow.input.body}" } }, "type": "HTTP", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "APSVJQT", "inputParameters": { "queryExpression": ".JSONJQINPUT | .", "JSONJQINPUT": { "objectType": "apsv", "response": "${APSVTask.output.response.body}" } }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false } ] }, "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "APSVDecision_join", "inputParameters": { "output": "${APSVJQT.output.result}", "APSVTaskOutput": "${APSVTask.output.response.body}" }, "type": "SIMPLE", "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "APSVFilter", "inputParameters": { "queryExpression": ".JSONJQINPUT | .", "JSONJQINPUT": "${APSVDecision_join.output.response.body.output}" }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false } ] ], "startDelay": 0, "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "join-0", "inputParameters": {}, "type": "JOIN", "startDelay": 0, "joinOn": [ "ARSCFilter", "OEEHFilter", "APSVFilter" ], "optional": false, "asyncComplete": false }, { "name": "mock_task", "taskReferenceName": "CSDMerge", "inputParameters": { "queryExpression": ".JSONJQINPUT | .", "JSONJQINPUT": { "arsc_jq": "${ARSCFilter.output.result}", "oeeh_jq": "${OEEHFilter.output.result}", "apsv_jq": "${APSVFilter.output.result}" } }, "type": "JSON_JQ_TRANSFORM", "startDelay": 0, "optional": false, "asyncComplete": false } ], "inputParameters": [], "outputParameters": { "searchText": "${workflow.input.body.text}", "applications": [ "${CSDMerge.output.result}" ] }, "schemaVersion": 2, "restartable": true, "workflowStatusListenerEnabled": false, "ownerEmail": "test@gmail.com", "timeoutPolicy": "ALERT_ONLY", "timeoutSeconds": 0, "variables": {}, "inputTemplate": {} } Task definition: { "createTime": 1678399938459, "createdBy": "", "accessPolicy": {}, "name": "mock_task", "description": "mock task request", "retryCount": 0, "timeoutSeconds": 0, "inputKeys": [], "outputKeys": [], "timeoutPolicy": "TIME_OUT_WF", "retryLogic": "FIXED", "retryDelaySeconds": 2, "responseTimeoutSeconds": 300, "inputTemplate": {}, "rateLimitPerFrequency": 0, "rateLimitFrequencyInSeconds": 1, "ownerEmail": "test@test.com", "backoffScaleFactor": 1 }

To Reproduce Steps to reproduce the behavior:

  1. Use the above given workflow and task definitions to reproduce the issue
  2. Use this input for workflow { "body": { "text": "101", "version": 3, "logicalId": "lid://demo-products.1", "page_number": 1, "page_size": 20, "userContext": null, "filters": [] } }
  3. See error conductor-server_1 | 2782317 [http-nio-8080-exec-3] INFO com.netflix.conductor.core.execution.WorkflowExecutor [] - Execution terminated of workflow: fd-csd-flow_mock2.1/1e79652f-0a2c-40d2-a9e9-9acec064cb4e.RUNNING conductor-server_1 | com.netflix.conductor.core.exception.TerminateWorkflowException: responseTimeout: 300 exceeded for the taskId: cc8dc3ff-3e09-406a-9867-ab7c6cc7747f with Task Definition: JOIN conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.retry(DeciderService.java:548) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT] conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:205) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT] conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:109) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT] conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1007) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT] conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT] conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT] conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.handleWorkflowEvaluationEvent(WorkflowExecutor.java:962) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]

Expected behavior Expect JOIN task to run without error.

Additional context When started workflow JOIN task scheduled is next here. JOIN is getting scheduled even before updating the taskUpdated time, so JOIN task is failing this check here and timing out.

I'll try to debug and will raise merge request if found anything.

james-deee commented 1 year ago

@vamsibandarupalli there is also a pretty big performance issue that was introduced around JOIN in the newer versions of Conductor. See the discussion here: https://github.com/Netflix/conductor/discussions/3436#discussioncomment-5485710

Which unfortunately, isn't getting much of a response from the maintainers/community here :(

v1r3n commented 1 year ago

https://github.com/Netflix/conductor/pull/3594 addresses this.

Holmesus commented 11 months ago

There is still this problem...

v1r3n commented 10 months ago

There is still this problem...

Hi @Holmesus can you provide more details?