tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.github.io/tfx/
Apache License 2.0
2.11k stars 708 forks source link

Evaluator fails with ValueError in Airflow when caching is turned off and retries are allowed #2805

Open codesue opened 3 years ago

codesue commented 3 years ago

When running a pipeline in Airflow, if enable_cache is set to False, but retries are allowed, a component can run more than once and generate more than one artifact. A downstream component that expects only one artifact will fail with a ValueError because it expects only one artifact.

{{base_task_runner.py:115}} INFO - Job 750: Subtask Evaluator     (len(input_dict[constants.MODEL_KEY])))
{{base_task_runner.py:115}} INFO - Job 750: Subtask Evaluator ValueError: There can be only one candidate model, there are 2.

As a workaround, I've been setting enable_cache to True to avoid unexpected behavior due to retries.

casassg commented 3 years ago

@charlesccychen @zhitaoli

Seems MLMD is reusing context and registering 2 output artifacts for the same component run instead of overwriting it on the 2nd run (at least that's my intuition)

We found the issue after we restarted execution on the Trainer without cache, and Evaluator was failing shortly after.

No rush or hotfix needed as we can workaround by using the cache, but would be nice to fix.

1025KB commented 3 years ago

Is retry a functionality provided by airflow? seems partial code is retried

codesue commented 3 years ago

Yes; if a component (operator) fails, Airflow can retry it: https://airflow.apache.org/docs/1.10.6/_api/airflow/operators/index.html?highlight=retry#package-contents

1025KB commented 3 years ago

“Seems MLMD is reusing context and registering 2 output artifacts for the same component run instead of overwriting it on the 2nd run” this is expected, MLMD won't do overwriting.

Is the first attempt success? Could you check the first artifact is PUBLISHED or PENDING?

casassg commented 3 years ago

To be clear, we are using TFX existing wrapper for Airflow, we dont have any custom code. This fails on current release 0.24.1.

The first attempt is succesful, but probably didnt finish. What happened for us was that:

@codesue can you query MLMD? I can help as well :D

1025KB commented 3 years ago

My current guess is,

TFX uses MLMD to check the status, if previous execution failed before publishing results to MLMD (previous artifact is pending), retry shouldn't cause any issue

but if TFX already finished the execution of a certain component (everything is finalized in MLMD), and then retry rerun this component again, from TFX point of view it it doesn't know it's retry, thus the retry will be treated as a new execution of that component and causes issue.

casassg commented 3 years ago

Here's a similar issue we ran when an ExampleGen was retried by Airflow. Example artifacts pbtxt in MLMD:

id: 3495
type_id: 5
uri: "gs://some-bucket/BigQueryExampleGen/examples/3312"
custom_properties {
  key: "name"
  value {
    string_value: "examples"
  }
}
custom_properties {
  key: "producer_component"
  value {
    string_value: "BigQueryExampleGen"
  }
}
create_time_since_epoch: 1604792653508
last_update_time_since_epoch: 1604792653508

2nd artifact:

id: 3498
type_id: 5
uri: "gs:/some-bucket/BigQueryExampleGen/examples/3312"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "examples"
  }
}
custom_properties {
  key: "producer_component"
  value {
    string_value: "BigQueryExampleGen"
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
create_time_since_epoch: 1604795761297
last_update_time_since_epoch: 1604795948590

Error in downstream component (StatsGen):

Traceback (most recent call last):
  File "/shared/airflow_package/airflow.pex/.deps/apache_airflow-1.10.4+twtr23-py2.py3-none-any.whl/airflow/models/taskinstance.py", line 922, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/shared/airflow_package/airflow.pex/.deps/apache_airflow-1.10.4+twtr23-py2.py3-none-any.whl/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/shared/airflow_package/airflow.pex/.deps/apache_airflow-1.10.4+twtr23-py2.py3-none-any.whl/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/orchestration/airflow/airflow_component.py", line 79, in _airflow_component_launcher
    launcher.launch()
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/orchestration/launcher/base_component_launcher.py", line 205, in launch
    execution_decision.exec_properties)
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/orchestration/launcher/in_process_component_launcher.py", line 67, in _run_executor
    executor.Do(input_dict, output_dict, exec_properties)
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/components/statistics_gen/executor.py", line 97, in Do
    examples = artifact_utils.get_single_instance(input_dict[EXAMPLES_KEY])
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/types/artifact_utils.py", line 67, in get_single_instance
    len(artifact_list)))
ValueError: expected list length of one but got 2

(There's no more artifacts with type_id = 5 under the same run context)

rcrowe-google commented 3 years ago

b/173098817

casassg commented 3 years ago

Any updates on this?

My guess at this point is that on retry artifacts are not overwritten. Aka if I re-run a component with the same run_id and same pipeline_id, then output artifacts should be overwritten. Otherwise, we are going against our constraint to have single output artifact per component_id/run_id/pipeline_id

casassg commented 3 years ago

Steps to repro:

@component
def random_fail(model: OutputArtifact[Model]):
  assert random(0,1)==1
  with open(os.path.join(model.uri, 'model.txt')) as f:
    f.write('test')

@component
def downstream(model:InputArtifact[Model]):
  pass

c1 = random_fail()
c2 = downstream(c1.outputs['model'])

pipeline = Pipeline(components=[c1, c2], ...)

DAG = AirflowDagRunner(AirflowDagConfig({'retries': 3, 'schedule':None})).run(pipeline)

This example will fail on c2 if c1 has been retried randomly. There's probs less random ways to test this

molejnik-mergebit commented 3 years ago

Same issue here. Is there any ETA on this?

nmelche commented 3 years ago

Same issue here with tfx.orchestration.kubeflow.kubeflow_dag_runner.KubeflowDagRunner

casassg commented 3 years ago

Out of curiosity @nmelche how do you add retries on KubeflowDagRunner? Using https://github.com/kubeflow/pipelines/blob/master/samples/core/retry/retry.py?

molejnik-mergebit commented 3 years ago

Could the priority on this issue be raised? It is very annoying on bigger pipelines (2k nodes) to have to re-run entire pipeline if any of the nodes fails, even with the cache enabled. Is there a workaround to just re-run single (failed) node? Like manually deleting the artifact or altering the MLMD?

YannisPap commented 3 years ago

Our evaluator also fails in GCP AI pipelines with caching = True.

I provide you with a screenshot, the evaluator logs and the requirements.txt (for the versions of used libraries)

I would appreciate even a workaround since it affects our production systems.

Let me know if you need any further details. DeepinScreenshot_select-area_20210512091154 evaluator_logs.txt requirements.txt

muhammadtehseenSystemsltd commented 3 years ago

I'm facing the same issue in Kubeflow, in the CSVExample gen component. Caching is on.

manalelidrissi commented 3 years ago

Any updates on this issue ?

molejnik-mergebit commented 3 years ago

Can we have at least an estimation when this issue could be addressed?

casassg commented 3 years ago

CC @rcrowe-google

zhitaoli commented 3 years ago

I tried to create a reproduction of Airflow retry and similar code pasted as @casassg but with a bit more determinism:

https://gist.github.com/zhitaoli/b2d92f8ad04d98d99974513563149d33

I was able to reproduce the error for once, but after upgrading to tfx 1.0.0 the issue was shadowed by the following error stack:

https://gist.github.com/zhitaoli/7dbaaa42abd8aa78cb54d52a266cd0ee

I'll dig a bit more with @hughmiao to see whether this is fixable.

Original error might be fixable with https://github.com/tensorflow/tfx/pull/4093 but without fixing above I cannot promise yet.

molejnik-mergebit commented 2 years ago

What about this issue? Can we have some update? Is there a workaround for this?