conductor-oss / conductor

Conductor is an event driven orchestration platform
https://conductor-oss.org
Apache License 2.0
18.19k stars 490 forks source link

Dynamic fork does not support array input as in Orkes version #20

Closed george-kirkman closed 5 months ago

george-kirkman commented 11 months ago

Describe the bug The Orkes conductor docs (currently the only docs available for Conductor unless you dig in the archived Netflix repo) describe a simple usage pattern of a Dynamic Fork, when given a single task (or subworkflow) name to execute and an array of inputs to execute that task with.

This behaviour is not supported in the OSS version of Conductor. We are instead forced to make a preceding inline/worker task to generate dynamic task JSON definitions and task input mappings. This is overly complex and would result in a huge amount of boilerplate JSON for 100+ length array that all need the same task or sub-workflow to be executed.

Details Using default conductor standalone docker image, taken from OSS Conductor ReadMe. Conductor version: OSS 3.15 Persistence implementation: default (Redis) Queue implementation: default (Postgres) Lock: default (Redis?) Workflow definition:

{
  "createTime": 1702551738787,
  "updateTime": 1703000317219,
  "name": "DynamicForkTest",
  "description": "Very simple dynamic fork",
  "version": 1,
  "tasks": [
    {
      "name": "dynamic_task",
      "taskReferenceName": "dynamic_task_ref",
      "inputParameters": {
        "forkTaskName": "add_one",
        "forkTaskInputs": "${workflow.input.Items}"
      },
      "type": "FORK_JOIN_DYNAMIC",
      "decisionCases": {},
      "dynamicForkTasksParam": "dynamicTasks",
      "dynamicForkTasksInputParamName": "dynamicTasksInput",
      "defaultCase": [],
      "forkTasks": [],
      "startDelay": 0,
      "joinOn": [],
      "optional": false,
      "defaultExclusiveJoinTask": [],
      "asyncComplete": false,
      "loopOver": [],
      "onStateChange": {}
    },
    {
      "name": "join_task",
      "taskReferenceName": "join_task_ref",
      "inputParameters": {},
      "type": "JOIN",
      "decisionCases": {},
      "defaultCase": [],
      "forkTasks": [],
      "startDelay": 0,
      "joinOn": [],
      "optional": false,
      "defaultExclusiveJoinTask": [],
      "asyncComplete": false,
      "loopOver": [],
      "onStateChange": {}
    }
  ],
  "inputParameters": [
    "Items"
  ],
  "outputParameters": {
    "Output": "${join_task_ref.output}"
  },
  "failureWorkflow": "",
  "schemaVersion": 2,
  "restartable": true,
  "workflowStatusListenerEnabled": false,
  "ownerEmail": "***",
  "timeoutPolicy": "ALERT_ONLY",
  "timeoutSeconds": 0,
  "variables": {},
  "inputTemplate": {},
  "onStateChange": {}
}

Task definition:

{
  "createTime": 1702551592911,
  "createdBy": "***",
  "name": "add_one",
  "description": "Super simple, adds 1 to input",
  "retryCount": 3,
  "timeoutSeconds": 3600,
  "inputKeys": [],
  "outputKeys": [],
  "timeoutPolicy": "TIME_OUT_WF",
  "retryLogic": "FIXED",
  "retryDelaySeconds": 60,
  "responseTimeoutSeconds": 600,
  "concurrentExecLimit": 0,
  "inputTemplate": {
    "Number": 1
  },
  "rateLimitPerFrequency": 0,
  "rateLimitFrequencyInSeconds": 1,
  "ownerEmail": "***",
  "pollTimeoutSeconds": 3600,
  "backoffScaleFactor": 1
}

Event handler definition: None

To Reproduce Steps to reproduce the behaviour:

  1. Load up local standalone OSS Conductor via Docker:
    
    docker volume create postgres
    docker volume create redis

docker run --init -p 8080:8080 -p 1234:5000 --mount source=redis,target=/redis \ --mount source=postgres,target=/pgdata conductoross/conductor-standalone:3.15.0


2. Add definitions above (no need to implement the worker to reproduce the bug, we won't make it that far)
3. Execute workflow with some array input, e.g. `{"numbers":[1,2,3,4]}`
4. See error: `Input to the dynamically forked tasks is not a map -> expecting a map of K,V but found null`

**Expected behaviour**
Input array should get split into 4 separate tasks (where 4 is array length) and processed concurrently in dynamic fork. As in [Orkes playground test run](https://play.orkes.io/execution/c278d24f-9e84-11ee-87d2-4a17fa5784e3).

**Screenshots**
Expected:
<img width="200" alt="image" src="https://github.com/conductor-oss/conductor/assets/93911737/46cc4a5b-6d9e-4aff-b19a-9e3119b2d27e">

Observed in OSS Conductor:
<img width="300" alt="image" src="https://github.com/conductor-oss/conductor/assets/93911737/129710a9-600d-49ed-99f1-6d19551e50e0">
george-kirkman commented 11 months ago

It looks like @saksham2105 may have already jumped on this issue with a PR: https://github.com/conductor-oss/conductor/pull/19

Thanks!