argoproj-labs / hera

Hera is an Argo Python SDK. Hera aims to make construction and submission of various Argo Project resources easy and accessible to everyone! Hera abstracts away low-level setup details while still maintaining a consistent vocabulary with Argo. ⭐️ Remember to star!
https://hera.rtfd.io
Apache License 2.0
559 stars 105 forks source link

JSON parsing for inline scripts needs custom processing in certain situations (fan-out/in) #1146

Open ljyanesm opened 1 month ago

ljyanesm commented 1 month ago

Pre-bug-report checklist

1. This bug can be reproduced using pure Argo YAML

If yes, it is more likely to be an Argo bug unrelated to Hera. Please double check before submitting an issue to Hera.

2. This bug occurs in Hera when...

Handling JSON serialised objects generated by Argo Worfklows using the json.loads method included at the beginning of all Scripts

Bug report

Describe the bug A clear and concise description of what the bug is:

When using nested fan-in patterns on Script tasks the json parsing code introduced to handle Argo parameters can fail, see:

try: message = json.loads(r'''{{inputs.parameters.in_param}}''')
except: message = r'''{{inputs.parameters.in_param}}'''

Unfortunately this results in a broken behaviour if one uses a nested fan-out and fan-in pattern. In these cases the Argo YAML generated isn’t aware of the multiple levels of serialisation, so one ends up with a top level JSON object that contains JSON that has been stringified.

For example consider a nested fan-out fan-in with 2 items and 2 sub items from which one would like to receive the data structure below back during the final fan-in.

[["item_0_subitem_0", "item_0_subitem_1"], ["item_1_subitem_0", "item_1_subitem_1"]]

In practise one receives the two item list below, where each item is a stringified version of the nested list.

["[\"\\"item_0_subitem_0\\"\",\"\\"item_0_subitem_1\\"\"]","[\"\\"item_1_subitem_0\\"\",\"\\"item_1_subitem_1\\"\"]"] The defect is that the content has been serialised twice, but only deserialised once. To fix this one needs a way to deserialise, then iterate over all the items in the data structure and deserialise those.

Error log if applicable:

There is no immediate error apart from the inconvenience of having to handle a partially deserialised JSON object.

To Reproduce Full Hera code to reproduce the bug:

from hera.workflows import Parameter, Steps, Workflow, script
from hera.workflows.models import ValueFrom

PROCESS_INNER = "process-inner"
@script(outputs=Parameter(name="items", value_from=ValueFrom(path="/tmp/items.json")))
def generate_outer_items():
    import json
    json.dump([f"item_{i}" for i in range(2)],open(f"/tmp/items.json", "w"))

@script(outputs=Parameter(name="processed_item", value_from=ValueFrom(path="/tmp/processed_item.json")))
def generate_inner_items(item: str):
    import json
    json.dump([(f"{item}_subitem_{i}", "outer") for i in range(2)], open(f"/tmp/processed_item.json", "w"))

@script(name=PROCESS_INNER,outputs=Parameter(name="processed_subitem", value_from=ValueFrom(path="/tmp/processed_subitem.json")))
def inner(subitem: str):
    import json
    json.dump((subitem, "test"), open(f"/tmp/processed_subitem.json", "w"))

@script()
def fan_in(all_subitem_list):
    import json
    # When passing parameters via Argo, these have to be JSON serialised. Hera manages deserialisation automatically,
    # unfortunately, in this example and other cases where multiple fan-out layers collapse on a single fan-in, the JSON
    # deserialisation is not aware of the workflow's topology and can result in partially deserialised objects.
    # The decode function is used to handle these partially deserialised objects.
    def decode(jo):
        def decode_inner(o):
            try:
                d = json.loads(o)
            except (json.JSONDecodeError, TypeError):
                return o
            if isinstance(d, str):
                return decode_inner(d)
            elif isinstance(d, list):
                return [decode_inner(i) for i in d]
            elif isinstance(d, dict):
                return {k: decode_inner(v) for k, v in d.items()}
            else:
                return d
        return decode_inner(json.dumps(jo))
    print("raw:")
    print(all_subitem_list)
    print("decoded:")
    print(decode(all_subitem_list))

with Workflow(
    generate_name="fan-out-fan-in-nested-",
    entrypoint="main",
) as wf:
    with Steps(name="outer",
               inputs=[Parameter(name="input_item"), Parameter(name="same_value")],
               outputs=[Parameter(name="processed_subitems", value_from=
               ValueFrom(parameter="{{steps.%s.outputs.parameters.processed_subitem}}" % PROCESS_INNER
               ))],
               ) as outer:
        inner_items = generate_inner_items(
            arguments={"item": outer.get_parameter("input_item"), "same_value": "same_for_all"},
        )
        processed_inner_item = inner(
            arguments={"subitem": "{{item}}"},
            with_param=inner_items.get_parameter("processed_item")
        )
    with Steps(name="main"):
        outer_items = generate_outer_items()
        processed_outer_items = outer(
            arguments={"input_item": "{{item}}", "same_value": "same_for_all"},
            with_param=outer_items.get_parameter("items")
        )
        # Note: This is a single fan-in for both fan-out levels (outer, inner), see the `decode` function for how to handle
        # the JSON deserialisation
        fan_in(
            arguments={"all_subitem_list": processed_outer_items.get_parameter("processed_subitems")}
        )

Expected behavior A clear and concise description of what you expected to happen:

The input object to fan_in should be a valid python object without any remanent JSON strings within.

Environment

Additional context Please notice the decode(jo) which could be used alternatively to the original json.loads(r'''{{inputs.parameters.in_param}}''' to handle this issue.

elliotgunton commented 1 month ago

Thanks @ljyanesm! This is definitely on our radar - I think #827 is a similar problem with escaping the nested JSON. I think we'd need a custom decoder the same as you've written for def decode(jo) as Argo itself garbles the JSON that it exports into the output parameters.

elliotgunton commented 1 week ago

Hey @ljyanesm - turns out this was a bug upstream - https://github.com/argoproj/argo-workflows/issues/13510 - with a PR that's now been merged! I think I'll leave this open until it's actually released

ljyanesm commented 1 week ago

Thank you @elliotgunton!

This is really great, looking forward to these changes.