arcalot / arcaflow-engine

Arcaflow is a highly-portable workflow engine enabling modular and validated pipelines through containerized plugins.
https://arcalot.io/arcaflow/
Apache License 2.0
6 stars 9 forks source link

Unable to use output of one subworkflow as the input of another subworkflow #165

Closed mfleader closed 3 months ago

mfleader commented 3 months ago

Describe the bug

Initially in attempting to use the list of outputs from a foreach subworkflow, the workflow fails with a validation error because the output object of the subworkflow is assigned a random ID that does not match the ID of the input schema of the second subworkflow.

2024-01-29T18:14:14+01:00   error   source=main Invalid workflow (input validation failed for workflow step 'cpu_index_loop' stage 'execute' (Validation failed: error while validating sub-type of property Items with type *schema.ListSchema (Validation failed: validation failed for object schema ID IndexObject. ID qb5jeq63hl52vihbj04tnbwq0mem4ybj does not match.)))

I was advised to define an outputSchema for the first subworkflow in order to define its ID explicitly so that it matches the input of the second workflow. After adding this to the first subworkflow:

...
outputSchema:
  success:
    schema:
      root: IndexObject
      objects:
        IndexObject:
          id: IndexObject
          properties:
            pcp:
              type:
                type_id: list
                items:
                  type_id: any

outputs:
  success:
    pcp: !expr $.steps.pcp.outputs.success.pcp_output

After doing this, the workflow produced a new error, which may be a bug in the go SDK:

2024-01-29T18:16:15+01:00   error   source=main Invalid workflow (invalid workflow (failed to load schema for step sysbench_cpu_loop (invalid workflow (Validation failed for 'outputSchema': Field cannot be set (reflect.Value.Convert: value of type map[string]*schema.StepOutputSchema cannot be converted to type map[string]interface {})))))

To reproduce

The WIP workflow that has this configuration is here: https://gitlab.com/redhat/edge/tests/perfscale/arcaflow-workflow-sysbench/-/tree/new-opensearch-plugin?ref_type=heads

mfleader commented 3 months ago

Minimum reproducible example

subwf-1.yaml

version: v0.2.0
input:
  root: SubRootObject1
  objects:
    SubRootObject1:
      id: SubRootObject1
      properties:
        name:
          type:
            type_id: string
steps:
  example1:
    plugin:
      src: quay.io/arcalot/arcaflow-plugin-template-python
      deployment_type: image
    input:
      name: !expr $.input.name
outputs:
  success:
    name: !expr $.steps.example1.outputs.success.message

workflow.yaml

version: v0.2.0
input:
  root: RootObject
  objects:
    RootObject:
      id: RootObject
      properties: {}
steps:
  subwf_1:
    kind: foreach
    items:
      - name: "john"
      - name: "paul"
      - name: "george"
      - name: "ringo"
    workflow: subwf-1.yaml
  subwf_2:
    kind: foreach
    items: !expr $.steps.subwf_1.outputs.success.data
    workflow: subwf-1.yaml
  subwf_3:
    kind: foreach
    items: !expr $.steps.subwf_2.outputs.success.data
    workflow: subwf-1.yaml
outputs:
  success:
    step_3: !expr $.steps.subwf_3.outputs.success

config.yaml

deployers:
  image:
    deployer_name: docker
log:
  level: debug    
logged_outputs:
  error:
    level: error

input.yaml

{}

output error

Unexpected error: invalid workflow (
  Validation failed for 'outputSchema': 
    Field cannot be set (
      reflect.Value.Convert: value of type map[string]*schema.StepOutputSchema cannot be converted 
      to type map[string]interface {}))