elyra-ai / elyra

Elyra extends JupyterLab with an AI centric approach.
https://elyra.readthedocs.io/en/stable/
Apache License 2.0
1.83k stars 343 forks source link

Allow for data exchange between custom KFP components #1761

Closed ptitzler closed 2 years ago

ptitzler commented 3 years ago

Describe the issue

Currently the VPE does not expose KFP component outputs, making it impossible for users to choose them as inputs for subsequent nodes. In this example, get lines produces a file that count lines needs to access.

image

A working DSL implementation of this pipeline can be found here: https://github.com/ptitzler/kfp-component-tests/blob/main/pipelines/create_and_run_pipeline-1.py

To Reproduce Steps to reproduce the behavior:

  1. Register https://raw.githubusercontent.com/ptitzler/kfp-component-tests/main/example-1/component.yaml as a KFP component (Get Lines)
  2. Register https://raw.githubusercontent.com/ptitzler/kfp-component-tests/main/example-2/component.yaml as a KFP component (Count Lines)
  3. Create a pipeline from the two components as shown above.

Expected behavior

DESIGN

========================================================================================== Motivation

Currently, Elyra supports a handful of sample components from Apache Airflow and Kubeflow Pipelines. These components demonstrate Elyra’s ability to use native concepts from each orchestrator, however a key portion of their functionality is missing, notably the ability to pass data and/or parameters from one component/operator to another via inputs and outputs

Considerations

We want to limit the scope of the issue to just the exchange of data between runtime native components. That is, for the time being, support data exchanging between Airflow operators -> Airflow operators and KFP components -> KFP components.

We support both Apache Airflow and Kubeflow Pipelines but both runtimes have very different ways of defining inputs and outputs.

Apache Airflow

Apache Airflow uses the concept of Xcoms or Cross Communication. Xcoms are small amounts of data that are shared between tasks (nodes). The data is represented by a key-value pair with the key being a string and a value that is serializable in JSON or pickable(pickle). These Xcoms can be pushed and pulled between tasks and by default are scoped to the DAG run (pipeline run)

Xcoms are built into the Airflow BaseOperator so all operators inherit them and are accessed via the task_instance(ti) object and xcom_push and xcom_pull helper methods.

t1 = BashOperator(
    task_id="t1",
    bash_command='echo "{{ ti.xcom_push(key="k1", value="v1") }}" "{{ti.xcom_push(key="k2", value="v2") }}"',
    dag=dag,
)
t2 = BashOperator(
    task_id="t2",
    bash_command='echo "{{ ti.xcom_pull(key="k1") }}" "{{ ti.xcom_pull(key="k2") }}"',
    dag=dag,
)
t1 >> t2

Limitations:

Note that there are size limitations to the amount of data that can be passed via Xcoms. Best practices seems to suggest that objects around a few MBs are ok to pass via Xcoms but anything larger should be handled via by file path reference (volumes, s3)

Resources: A good guide : https://marclamberti.com/blog/airflow-xcom/

Kubeflow Pipelines

Elyra uses KFP component definitions when considering how it handles input and outputs and how to share data. Inputs and Outputs are specified in the component definition under each respective name and are then used in the implementation with type hinting(inputPath, inputValue, outputPath) to describe how each argument should be processed, either by reference(Path) or value(Value).

name: Truncate file
description: Gets the specified number of lines from the input file.

inputs:
- {name: Input 1, type: String, optional: false, description: 'Data for input 1'}
- {name: Parameter 1, type: Integer, default: '100', optional: true, description: 'Number of lines to keep'}

outputs:
- {name: Output 1, type: String, description: 'Output 1 data.'}

implementation:
  container:
    image: quay.io/ptitzler/kfp-ex-truncate-file@sha256:37e20c5f5daae264a05f7bb595aac19ebd7b045667b7056ba3a13fda1b86746e
    # command is a list of strings (command-line arguments). 
    # The YAML language has two syntaxes for lists and you can use either of them. 
    # Here we use the "flow syntax" - comma-separated strings inside square brackets.
    command: [
      python3, 
      # Path of the program inside the container
      /pipelines/component/src/truncate-file.py,
      --input1-path,
      {inputPath: Input 1},
      --param1, 
      {inputValue: Parameter 1},
      --output1-path, 
      {outputPath: Output 1},
    ]

Using the truncate example above:

The following inputs and outputs would appear as a properties in the node properties pane in the pipeline editor

inputs:
- {name: Input 1, type: String, optional: false, description: 'Data for input 1'}
- {name: Parameter 1, type: Integer, default: '100', optional: true, description: 'Number of lines to keep'}

outputs:
- {name: Output 1, type: String, description: 'Output 1 data.'}

Limitations: Best practices indicate that users should limit the amount of data passed by value to 200KB per pipeline run.

Envisioned workflow Given that the inputs and outputs are all defined prior to submission, we should be able to translate them into the appropriate fields for each runtime in order to properly execute the pipeline.

Example scenarios (Cn = <component N>, On = <output N>, In <input N>)

  • C1() -> C2(I1) => For C2 user can
    • do nothing
    • enter the value of I1
  • C1(O1,O2) -> C2 (I1) => For C2 user can

    • do nothing
    • enter the value of I1
    • select O1 or O2 as the value of I1
  • C1(O1,O2) -> C2 (I1,O3) -> C3(I2) => for C3 user can

    • do nothing
    • enter the value of I2
    • select O1 or O2 as the value of I2
    • select O3 as the value of I2

Example: image

==========================================================================================

KFP Implementation

Expectations after completion:

Open Discussions:

AA Implementation

Expectations after completion:

ptitzler commented 3 years ago

Example scenarios (Cn = <component N>, On = <output N>, In <input N>)

kevin-bates commented 3 years ago

Thank you - this is helpful. I see that you added 'do nothing' to the first case - which was my question after reading the email-notification (original) version.

So it seems our general idea of "all outputs are available as inputs" will need much greater scrutiny since all inputs must be explicitly declared.

Also wondering how wild-carding would work. I suppose inputPath could be a file containing a set of other references. This would only apply to our built-in components of course. (I actually thought this comment was on #1765, so I apologize for the cross-posting kind of thing.)

ptitzler commented 3 years ago

I see that you added 'do nothing' to the first case - which was my question after reading the email-notification (original) version.

I added this to cover optional input parameters.

So it seems our general idea of "all outputs are available as inputs" will need much greater scrutiny since all inputs must be explicitly declared. ...

Let's move this to https://github.com/elyra-ai/elyra/issues/1765 to separate the specific requirements for generic components since they appear to be a lot more complex.

I should add that we still need to investigate for this issue how the Airflow data exchange works. @akchinSTC ?

ptitzler commented 3 years ago

Worthwhile reads:

ajbozarth commented 3 years ago

UI Design based on Webex discussion, @akchinSTC will post the Server side of the design in a separate comment.

  1. We will add a button next to any node property that is in the list of inputs for that nodeDef.
  2. The button will make a call the a new API that @akchinSTC will be making that will send the current pipeline json and the nodeId for the current node (that has properties open) and will return a dict. The dict will include the parent node names for the current node and a list of each of their outputs.
  3. Then a dialog will open with a drop down containing the parent node names, once a parent node is selected a second dropdown will appear/become interactive the includes all the outputs for that parent node.
  4. When a user selects an output for a parent node and confirms the dialog the value of that property will be updated to parentNodeName.outputs['outputName'] which can be submitted to the backend as is. The user can always edit the value to something else later if they want.
kevin-bates commented 3 years ago

UI Design based on Webex discussion, @akchinSTC will post the Server side of the design in a separate comment.

  1. We will add a button next to any node property that is in the list of inputs for that nodeDef.
  2. The button will make a call the a new API that @akchinSTC will be making that will send the current pipeline json and the nodeId for the current node (that has properties open) and will return a dict. The dict will include the parent node names for the current node and a list of each of their outputs.

This response should also include the node-id mappings to each of the ancestor node-names also - is that right? Since the frontend doesn't have this mapping and only works from node-ids, this seems useful.

This request to the server doesn't need to be performed per node property, just per node since other properties of that node will leverage the same information.

  1. Then a dialog will open with a drop down containing the parent node names, once a parent node is selected a second dropdown will appear/become interactive the includes all the outputs for that parent node.

I guess I would have thought the property "button" would enable an "output selector mode" where the user could then go select an ancestor node, open its output properties and select one to associate as input on the current node. Rather than bring every ancestor node's properties into dropdown dialogs. This is where having the ancestor node-ids would be helpful. Since this is more about UX I'll leave things to the experts.

  1. When a user selects an output for a parent node and confirms the dialog the value of that property will be updated to parentNodeName.outputs['outputName'] which can be submitted to the backend as is. The user can always edit the value to something else later if they want.

Does this approach make the parent node name a "keyword" (more or less) that cannot be used as a value otherwise? How does the backend distinguish between a raw value and a referenced value (i.e., an ancestor node's output)?

Another issue is that the node-name uniquefication (today) happens at submission time. This logic consists of appending an incremented value to the end of non-unique node names. As a result, these "unique" names would not be durable unless the entire pipeline is also returned (with the updated node names). Not doing that introduces the possibility of users "inserting" new ancestor nodes with non-unique names, thereby messing up existing output-to-input mappings. So, we'd now need to include the (potentially updated) pipeline definition in the response as well.

I think we can eliminate the need for this service entirely if we do the following:

  1. Write a front-end function to return a list of ancestor node ids from a given node (id).
  2. Treat a node's input property as two pieces. The ancestor node's output name and its node id. The id piece only applies to referenced inputs.
  3. The displayed value should reflect the ancestor node's label and convey its applicable output name, but the rigid format of parentNodeName.outputs['outputName'] is something the server should produce - especially since that could change and will vary between platforms. (Perhaps an icon or link to the ancestor node could be reflected in such values.)
  4. The server could distinguish raw inputs from referenced inputs by the presence of the ancestor node id (for referenced inputs).

We should also require the front-end to enforce node name uniqueness. For example, if the same notebook is used for two nodes, the creation of the second node should trigger a UI error requesting the node label be changed. Enforcing this uniqueness would then allow for better UX when it comes to conveying the ancestor node when referenced in an input property's value. Same goes for when a duplicated KFP or Airflow component is placed on the canvas.

Although the context of this is KFP-only, we could adopt this output-to-input mapping to Generic and Airflow platforms as well.

Thoughts?

akchinSTC commented 3 years ago

@lresende - has some discussion with kiersten and the sync up and wanted to get your thoughts:

L511-514 in the kfp processor - We have a custom type file that we are using to read in the contents of a file into inputPath (papermill and filter text components). This is why these two components are working as is. I feel that an inputPath should reference an output of a previous step, but in actuality, if a string is given in place of that, it will be used literally as the value. If this is the behavior in KFP, (though i dont agree with it) we will need support inputPath as both a reference and value in Elyra

Supporting inclusion of large local dependencies in pipeline run with custom components Passing of information (parameters, dependencies local to the user workspace) to the pipeline should only be done via inputValue however there are limits to the amount you should be passing without incurring performance/runtime issues. How best to deal including local artifacts totaling >200KB per run?

akchinSTC commented 3 years ago

After discussions, we should not introduce any behaviors or otherwise that differ from the supported behavior in KFP.

The ability to include files that are local to the elyra workspace to the pipeline are out of scope with this issue but will be addressed in #1765,

akchinSTC commented 3 years ago

UI Design based on Webex discussion, @akchinSTC will post the Server side of the design in a separate comment.

  1. We will add a button next to any node property that is in the list of inputs for that nodeDef.
  2. The button will make a call the a new API that @akchinSTC will be making that will send the current pipeline json and the nodeId for the current node (that has properties open) and will return a dict. The dict will include the parent node names for the current node and a list of each of their outputs.
  3. Then a dialog will open with a drop down containing the parent node names, once a parent node is selected a second dropdown will appear/become interactive the includes all the outputs for that parent node.
  4. When a user selects an output for a parent node and confirms the dialog the value of that property will be updated to parentNodeName.outputs['outputName'] which can be submitted to the backend as is. The user can always edit the value to something else later if they want.

Barring the subject of a new service API for this function AND instead.. head down the path of letting the front end determine the outputs available to specific nodes...

  1. Given each node created has a unique node ID, these should be used to reference other nodes in the raw pipeline file. More than one of the same component can exist in the same pipeline, hence the the use of nodeIDs, but they will all have same definition if they are the same component.

  2. In the visual pipeline editor, display of a nodeID in the node properties panel is not the best idea, we should instead use the friendlier node name/label or some other intuitive reference. BAD image

    • We should also require the front-end to enforce node name uniqueness. For example, if the same notebook is used for two nodes, the creation of the second node should trigger a UI error requesting the node label be changed. Enforcing this uniqueness would then allow for better UX when it comes to conveying the ancestor node when referenced in an input property's value. Same goes for when a duplicated KFP or Airflow component is placed on the canvas. @kevin-bates alluded to this earlier.... if we do use the node name/label, we will need to ensure the name/label uniqueness is taken into consideration during pipeline construction and not post-submission.

  3. The number of inputs and outputs and their respective names for a component are provided via the component registry service. The front-end should (correct if me im wrong) have enough information to include in the json payload to the processor: a unique link(use the term loosely) from one component to another.

    • This info will be put in the pipeline payload under each respective component and each of its parameters defined as an 'inputPath'. How should it be structured? It should contain the minimum:
    • nodeID of the node that contains the output requested
    • name of the output itself e.g. 'Data' or 'File'
  4. When a component parameter (that is defined as an inputPath) references another node's outputs the current implementation of linking one node to another in the pipeline editor is no longer necessary since this creates an implicit connection once the pipeline is compiled.

    • We specifically drag a link from one node to another to define a connection between them to ensure that outputs from the parent node are provided for the input node. When we implement this new way of linking inputs to outputs, this step becomes unnecessary between kfp specific components since the kfp compiler will do this for us.
    • We should avoid this, as @ptitzler pointed out in https://github.com/elyra-ai/elyra/issues/1761#issuecomment-908051130, moving forward we should only allow outputs from parent nodes explicitly linked in the VPE to avoid issues like this and also to maintain as close to a graph in the VPE vs KFP ui
kevin-bates commented 3 years ago

We specifically drag a link from one node to another to define a connection between them to ensure that outputs from the parent node are provided for the input node. When we implement this new way of linking inputs to outputs, this step becomes unnecessary between kfp specific components since the kfp compiler will do this for us.

While logically unnecessary, the process of linking nodes explicitly is still intuitive. Are you suggesting that the graph be built purely based on which inputs are linked to what outputs?

I'm curious about the following.

Today, (for generic components), all outputs of ancestor (upstream) nodes are available as inputs to decedent (downstream) nodes and, because we (Elyra) build the pipeline in that manner (not sure this is 100% correct), the "diagram" displayed in Kubeflow and Airflow essentially depicts the same pattern as viewed in the Elyra visual pipeline editor (VPE).

For custom (non-generic) components, inputs of decedent (downstream) nodes must be explicitly tied to outputs of ancestor (upstream) nodes (when the input is not a user-entered value). So, while we might still allow for the explicit linkage of nodes via the VPE, will the "diagram" displayed in Kubeflow or Airflow no longer depict the same pattern as viewed in the Elyra VPE when say the following happens:

VPE depiction:

Node A -> Node B -> Node C

Node B has inputs tied to Node A and Node C has inputs tied to Node A, but not Node B

Is the KFP depiction like the following, where Node C could be processed before Node B finishes?

Node A +-> Node B
       +-> Node C

or will KFP still honor the VPE depiction and essentially defer the processing of Node C until Node B has finished? I'm sure this is a function of how the pipeline is built in terms of the components, but I also suspect we don't have that level of control - which I believe is what @akchinSTC is driving at.

If the former, should the VPE generate the appropriate depiction of the graph?

akchinSTC commented 3 years ago

While logically unnecessary, the process of linking nodes explicitly is still intuitive. Are you suggesting that the graph be built purely based on which inputs are linked to what outputs?

Not at all, I agree linking nodes explicitly is still intuitive and necessary and should be used, just something to keep in mind for the near future. We may need to just document this behavior somewhere so when the user links two node's together via the properties inputs and outputs, but doesnt link those two nodes together via the "drag and connect", the pipeline will still work, but it wont display the connection in the UI, leading to some confusion

or will KFP still honor the VPE depiction and essentially defer the processing of Node C until Node B has finished?

I am 98.5325% sure that kubeflow will defer the processing of node C until all the inputs it requires( meaning all parent node output providers) have run to completion.

kevin-bates commented 3 years ago

or will KFP still honor the VPE depiction and essentially defer the processing of Node C until Node B has finished?

I am 98.5325% sure that kubeflow will defer the processing of node C until all the inputs it requires( meaning all parent node output providers) have run to completion.

OK, but my example is that Node C only depends on Node A (via inputs) despite the user's expectation for Node C to "wait for" Node B, although there's nothing Node C needs from Node B. So it seems like KFP will not honor the VPE depiction of the pipeline and, instead, produce the true flow. Are you saying there's some implicit way to make Node C wait for Node B besides node-parameter dependencies?

akchinSTC commented 3 years ago

So it seems like KFP will not honor the VPE depiction of the pipeline and, instead, produce the true flow.

ahh I think I understand what youre driving at now. Yes the depiction will differ from VPE to KFP UI in that kfp will produce the "true flow".

A user could create a pipeline in the VPE that resembles the "true flow" which would be equivalent to what you had in your second diagram. But the first diagram would be valid as well if we follow the behavior we have today regarding dependency availability.

Node A  +-> Node B
      \   
        +-> Node C
ptitzler commented 3 years ago

[We may need to just document this behavior somewhere] so when the user links two node's together via the properties inputs and outputs, but doesn't link those two nodes together via the "drag and connect",

I don't think this should be allowed because it would unnecessarily complicate things for the user and the VPE implementation:

akchinSTC commented 3 years ago

Current response when querying the component properties registry api

{
    "current_parameters": {
        "label": "",
        "component_source": "kfp/run_notebook_using_papermill.yaml",
        "elyra_notebook": "",
        "elyra_parameters": "{}",
        "elyra_packages_to_install": "[]",
        "elyra_input_data": ""
    },
    "parameters": [
        {
            "id": "label"
        },
        {
            "id": "component_source"
        },
        {
            "id": "elyra_notebook"
        },
        {
            "id": "elyra_parameters"
        },
        {
            "id": "elyra_packages_to_install"
        },
        {
            "id": "elyra_input_data"
        }
    ],
    "uihints": {
        "id": "nodeProperties",
        "parameter_info": [
            {
                "parameter_ref": "label",
                "control": "custom",
                "custom_control_id": "StringControl",
                "label": {
                    "default": "Label"
                },
                "description": {
                    "default": "A custom label for the node.",
                    "placement": "on_panel"
                },
                "data": {}
            },
            {
                "parameter_ref": "component_source",
                "control": "readonly",
                "label": {
                    "default": "Component Source"
                },
                "description": {
                    "default": "The path to the component specification file.",
                    "placement": "on_panel"
                },
                "data": {}
            },
            {
                "parameter_ref": "elyra_notebook",
                "control": "custom",
                "custom_control_id": "StringControl",
                "label": {
                    "default": "Notebook"
                },
                "description": {
                    "default": "Required. Notebook to execute. (type: JupyterNotebook)",
                    "placement": "on_panel"
                },
                "data": {
                    "format": "file",
                    "required": true
                }
ptitzler commented 3 years ago

Is above pipeline snippet a proposal or a recap of what the current structure looks like?

kiersten-stokes commented 3 years ago

Is above pipeline snippet a proposal or a recap of what the current structure looks like?

The JSON is the current structure and the bullet points are the proposed changes

akchinSTC commented 3 years ago

Proposed changes: (note: sampled, not complete)

{
    "current_parameters": {
        "label": "",
        "component_source": "kfp/run_notebook_using_papermill.yaml",
        "elyra_notebook": "",
        "elyra_parameters": "{}",
        "elyra_packages_to_install": "[]",
        "elyra_input_data": ""
        "elyra_notebook_1": "",       #Output   # Duplicate IDs, whether with inputs or other outputs will be postfix'ed 
        "elyra_output_data": "",      #Output  
    },
    "parameters": [
        {
            "id": "label"
        },
        {
            "id": "component_source"
        },
        {
            "id": "elyra_notebook"
        },
        {
            "id": "elyra_parameters"
        },
        {
            "id": "elyra_packages_to_install"
        },
        {
            "id": "elyra_input_data"
        },
        {
            "id": "elyra_notebook_1".         # id must match all three areas in current parameters, parameters and ui_hints
        },
        {
            "id": "elyra_output_data"
        }
    ],
    "uihints": {
        "id": "nodeProperties",
        "parameter_info": [
            {
                "parameter_ref": "label",
                "control": "custom",
                "custom_control_id": "StringControl",
                "label": {
                    "default": "Label"
                },
                "description": {
                    "default": "A custom label for the node.",
                    "placement": "on_panel"
                },
                "data": {}
            },
            {
                "parameter_ref": "component_source",
                "control": "readonly",
                "label": {
                    "default": "Component Source"
                },
                "description": {
                    "default": "The path to the component specification file.",
                    "placement": "on_panel"
                },
                "data": {}
            },
            {
                "parameter_ref": "elyra_notebook",
                "control": "custom",
                "custom_control_id": "EnumControl",    # changed to EnumControl for dropdown selection
                "label": {
                    "default": "Notebook"
                },
                "description": {
                    "default": "Required. Notebook to execute. (type: JupyterNotebook)",
                    "placement": "on_panel"
                },
                "data": {
                    "format": "",              # TBD
                    "type": "inputPath",      # additional type info added
                    "required": true
                },
            {
                "parameter_ref": "elyra_parameters",
                "control": "custom",
                "custom_control_id": "StringControl",
                "label": {
                    "default": "Parameters"
                },
                "description": {
                    "default": "Map with notebook parameter values. (type: JsonObject)",
                    "placement": "on_panel"
                },
                "data": {
                    "format": "string",
                    "type": "inputValue",       # additional type info added
                    "required": true
                }
            },
           {
                "parameter_ref": "elyra_notebook_1",       # output parameter
                "control": "readonly",
                "label": {
                    "default": "Notebook"
                },
                "description": {
                    "default": "Executed notebook.",
                    "placement": "on_panel"
                },
                "data": {
                   "type": "outputPath    # additional type info added    
                }
            },
           {
                "parameter_ref": "elyra_output_data",     # output parameter
                "control": "readonly",
                "label": {
                    "default": "Output data"
                },
                "description": {
                    "default": "Directory with any output data. In notebook, the OUTPUT_DATA_PATH variable will point to this directory, so that the notebook can write output data there.",
                    "placement": "on_panel"
                },
                "data": {
                   "type": "outputPath        # additional type info added   
                }
            },
kevin-bates commented 3 years ago

If I'm following things, the JSON above should adhere to this schema: https://github.com/elyra-ai/pipeline-schemas/blob/master/common-canvas/parameter-defs/parameter-defs-v3-schema.json

If that's true, then the "parameters" section adheres to this schema: https://github.com/elyra-ai/pipeline-schemas/blob/master/common-pipeline/operators/operator-v3-schema.json#/properties/parameters where "id" and any-of "type" or "enum" are required.

"uihints" follows this schema: https://github.com/elyra-ai/pipeline-schemas/blob/master/common-pipeline/operators/uihints-v3-schema.json

and "current_parameters" is essentially free-form - so we could do whatever here.

So it looks like only "parameters" is non-conformant (in that the example missing required properties).

ajbozarth commented 3 years ago

@lresende

@akchinSTC suggested I share my "bad" backup UI idea to get another opinion. What if instead of a conditional nested drop down (ie a second drop down whose values depend on the selection of the first drop down) we "flatten" it to a single drop down where the values look like: "parent name: output name" As far as UI goes it would be easier to implement since we could leverage out current enum controller, but at the same time a large amount of the code would be thrown out if we change our minds later and want to go with the condition nested enum version.

Personally I'm not a fan because the flattened approach doesn't scale well for UX, but the idea crossed my mind so I figured I'd share it.