argoproj / argo-workflows

Workflow Engine for Kubernetes
https://argo-workflows.readthedocs.io/
Apache License 2.0
14.92k stars 3.18k forks source link

Override artifact argument when calling a workflow template #3731

Closed dynamicmindset closed 4 years ago

dynamicmindset commented 4 years ago

Summary

I am working on multiple ETL pipelines that would support both processing and reprocessing input data.

Motivation

Here are the use cases:

I. Extract data and process it

II. Reprocess existing data

EDIT - If you have a similar use case you can stop reading here and check the solution in the comment.

Tentative implementation

I think artifact passing could be used for the first use case, but does not apply for the second one (reprocessing). What I've tried is using a workflow template (T) that gets the artifact as an argument.

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: transform-template
spec:
  entrypoint: T

  templates:
    - name: T
      dag:
        tasks:
          - name: apply-transformations
            template: transform
            arguments:
              artifacts:
                - name: input-data
#                  raw:
#                    data: |
#                      test data line 1
#                      test data line 2
                  s3:
                    endpoint: s3.amazonaws.com
                    bucket: <bucket-name>
                    region: eu-west-1
                    key: pipeline/test-data.out
                    accessKeySecret:
                      name: s3-credentials
                      key: accessKey
                    secretKeySecret:
                      name: s3-credentials
                      key: secretKey

    - name: transform
      container:
        image: <image>
        imagePullPolicy: Always
        command: <command>
      inputs:
        artifacts:
          - name: input-data
            path: /tmp/input.in
      outputs:
        artifacts:
          - name: transform-output
            path: /tmp/output.out

---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: reprocess-workflow-
spec:
  entrypoint: reprocess
  templates:
    - name: reprocess
      dag:
        tasks:
          - name: process
            templateRef:
              name: transform-template
              template: T
            arguments:
              artifacts:
                - name: input-data
                  s3:
                    endpoint: s3.amazonaws.com
                    bucket: <bucket>
                    region: eu-west-1
                    key: pipeline/data-method-a.in
                    accessKeySecret:
                      name: s3-credentials
                      key: accessKey
                    secretKeySecret:
                      name: s3-credentials
                      key: secretKey

Looking in the init pod the artifact is not overridden, the one specified in the WFT being always downloaded. As a workaround I am considering using a parameter argument in the S3 key, but this would couple the workflow with S3.

I didn't find any example of combining templateRef with artifacts as arguments, but I am wondering if this is a potential usage scenario and if there is an alternative to it. Do you have any advice on how to handle use cases above?

Update

I managed to pass the artifact to a Template (outside of dag) that has an input artifact. I could reference this template separately, but ideally I would be able to use a DagTemplate in a similar way (only defining required inputs).

Looking at the WFT dag example the message does not seem to have any effect.

 - name: C
        dependencies: [A]
        templateRef:
          name: workflow-template-inner-dag
          template: inner-diamond
        arguments:
          parameters:
          - name: message
            value: message_here_is_not_passed_further

Is this expected?

dynamicmindset commented 4 years ago

I finally got it, the presented use case is definitely supported. This example was useful.

In case someone else will have the same use cases here is how it can be done:

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: transform-template
spec:
  entrypoint: T

  templates:
    - name: T
      inputs:
        artifacts:
          - name: dag-input-data
      dag:
        tasks:
          - name: apply-transformations
            template: transform
            arguments:
              artifacts:
                - name: input-data
                  from: "{{inputs.artifacts.dag-input-data}}"

    - name: transform
      container:
        image: <image>
        imagePullPolicy: Always
        command: <command>
      inputs:
        artifacts:
          - name: input-data
            path: /tmp/input.in
      outputs:
        artifacts:
          - name: transform-output
            path: /tmp/output.out

---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: reprocess-workflow-
spec:
  entrypoint: reprocess
  templates:
    - name: reprocess
      dag:
        tasks:
          - name: process
            templateRef:
              name: transform-template
              template: T
            arguments:
              artifacts:
                - name: dag-input-data
                  s3:
                    endpoint: s3.amazonaws.com
                    bucket: <bucket>
                    region: eu-west-1
                    key: pipeline/data-method-a.in
                    accessKeySecret:
                      name: s3-credentials
                      key: accessKey
                    secretKeySecret:
                      name: s3-credentials
                      key: secretKey

Basically a new input is added at dag level and that input is further passed as argument to a step in dag.

Will close the issue now.