aws / aws-step-functions-data-science-sdk-python

Step Functions Data Science SDK for building machine learning (ML) workflows and pipelines on AWS
Apache License 2.0
285 stars 87 forks source link

Execution inputs as container arguments for processing jobs #197

Open francescocamussoni opened 1 year ago

francescocamussoni commented 1 year ago

I'm trying to use execution inputs as container arguements for my processing job:

execution_input = ExecutionInput(
    schema={
        "IngestaJobName": str,
        "PreprocessingJobName": str,
        "InferenceJobName": str,
        "Fecha": str,
    }
)
#Call step
ingesta_step = ProcessingStep(
    inference_config["ingesta_step_name"],
    processor=ingesta_processor,
    job_name=execution_input['IngestaJobName'],
    inputs=inputs_ingesta,
    outputs=outputs_ingesta,
    container_arguments=["--fecha", "$$.Execution.Input['Fecha']"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/"+inference_config["ingesta_function"]], 
)

I've also tried to replace container_arguments for ["--fecha", execution_input["Fecha"]]

But in both cases it doesn't work.

Use Case

When I lunch a new execution of my state machine, it would be useful to get some execution inputs as a container argument in order to define some parameters of intereset that will be define the behaviour of the step directly by the execution input without updating the state machine definition


This is a :rocket: Feature Request

wong-a commented 1 year ago

I've also tried to replace container_arguments for ["--fecha", execution_input["Fecha"]] But in both cases it doesn't work.

Can you elaborate on the behaviour you are seeing?

francescocamussoni commented 1 year ago

With ["--fecha", execution_input["Fecha"]] I get this error TypeError: Object of type ExecutionInput is not JSON serializable

When I execute

branching_workflow = Workflow(
    name=pipeline_name_step,
    definition=workflow_graph,
    role=role,
    execution_input=execution_input
).create()

This is the complete error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [282], in <cell line: 2>()
      1 # Create or update your StateMachine Workflow
----> 2 branch_workflow = branching_workflow.create()

File /usr/local/lib/python3.8/site-packages/stepfunctions/workflow/stepfunctions.py:205, in Workflow.create(self)
    202     return self.state_machine_arn
    204 try:
--> 205     self.state_machine_arn = self._create()
    206 except self.client.exceptions.StateMachineAlreadyExists as e:
    207     self.state_machine_arn = self._extract_state_machine_arn(e)

File /usr/local/lib/python3.8/site-packages/stepfunctions/workflow/stepfunctions.py:215, in Workflow._create(self)
    212 def _create(self):
    213     response = self.client.create_state_machine(
    214         name=self.name,
--> 215         definition=self.definition.to_json(pretty=self.format_json),
    216         roleArn=self.role,
    217         tags=self.tags
    218     )
    219     logger.info("Workflow created successfully on AWS Step Functions.")
    220     return response['stateMachineArn']

File /usr/local/lib/python3.8/site-packages/stepfunctions/steps/states.py:91, in Block.to_json(self, pretty)
     82 """Serialize to a JSON formatted string.
     83 
     84 Args:
   (...)
     88     str: JSON formatted string representation of the block.
     89 """
     90 if pretty:
---> 91     return json.dumps(self.to_dict(), indent=4)
     93 return json.dumps(self.to_dict())

File /usr/local/lib/python3.8/json/__init__.py:234, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
--> 234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
    238     **kw).encode(obj)

File /usr/local/lib/python3.8/json/encoder.py:201, in JSONEncoder.encode(self, o)
    199 chunks = self.iterencode(o, _one_shot=True)
    200 if not isinstance(chunks, (list, tuple)):
--> 201     chunks = list(chunks)
    202 return ''.join(chunks)

File /usr/local/lib/python3.8/json/encoder.py:431, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
    429     yield from _iterencode_list(o, _current_indent_level)
    430 elif isinstance(o, dict):
--> 431     yield from _iterencode_dict(o, _current_indent_level)
    432 else:
    433     if markers is not None:

File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
    403         else:
    404             chunks = _iterencode(value, _current_indent_level)
--> 405         yield from chunks
    406 if newline_indent is not None:
    407     _current_indent_level -= 1

File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
    403         else:
    404             chunks = _iterencode(value, _current_indent_level)
--> 405         yield from chunks
    406 if newline_indent is not None:
    407     _current_indent_level -= 1

    [... skipping similar frames: _make_iterencode.<locals>._iterencode_dict at line 405 (2 times)]

File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
    403         else:
    404             chunks = _iterencode(value, _current_indent_level)
--> 405         yield from chunks
    406 if newline_indent is not None:
    407     _current_indent_level -= 1

File /usr/local/lib/python3.8/json/encoder.py:325, in _make_iterencode.<locals>._iterencode_list(lst, _current_indent_level)
    323         else:
    324             chunks = _iterencode(value, _current_indent_level)
--> 325         yield from chunks
    326 if newline_indent is not None:
    327     _current_indent_level -= 1

File /usr/local/lib/python3.8/json/encoder.py:438, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
    436         raise ValueError("Circular reference detected")
    437     markers[markerid] = o
--> 438 o = _default(o)
    439 yield from _iterencode(o, _current_indent_level)
    440 if markers is not None:

File /usr/local/lib/python3.8/json/encoder.py:179, in JSONEncoder.default(self, o)
    160 def default(self, o):
    161     """Implement this method in a subclass such that it returns
    162     a serializable object for ``o``, or calls the base implementation
    163     (to raise a ``TypeError``).
   (...)
    177 
    178     """
--> 179     raise TypeError(f'Object of type {o.__class__.__name__} '
    180                     f'is not JSON serializable')

TypeError: Object of type ExecutionInput is not JSON serializable

On the other hand, with ["--fecha", "$$.Execution.Input['Fecha']"], I get $$.Execution.Input['Fecha'] as a literal string inside the .py of the processing job, it seems that the placeholder doesn't work.

image image

wong-a commented 1 year ago

Thanks for the additional details. The problem is likely that ProcessingStep passes container_arguments directly to `sagemaker.workflow.airflow#processing_config

https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/02ed72b10d7c0dbfb3e404b1e183c309c040bfaa/src/stepfunctions/steps/sagemaker.py#L558-L561

The work should be similar to https://github.com/aws/aws-step-functions-data-science-sdk-python/pull/155

With some slight differences because difference because container_arguments is an array instead of a object

francescocamussoni commented 1 year ago

Hi Wong, thank you for your response.

I don't know how to use parameters inside the processing job. I've tried image But its the same as defining a container argument, so I get the same error image