tensorflow / model-analysis

Model analysis tools for TensorFlow
Apache License 2.0
1.26k stars 281 forks source link

TFMA on Flink does not seems to be parallelizing work. #170

Open jccarles opened 1 year ago

jccarles commented 1 year ago

Hello I am running into an issue running the evaluator component of a tfx pipeline. I use the FlinkRunner for beam and the evaluator component is super slow as the size of data scales. It seems it is because the work is done only by a single Task Manager.

System information

I am running a TFX pipeline using python 3.7. TFX version 1.8.1 which comes with TFMA version tensorflow-model-analysis==0.39.0. I don't have a small example to reproduce, I can work on one if you think it will help.

Describe the problem

I use the evaluator TFX component as such

evaluator = Evaluator(
        examples=example_gen.outputs[standard_component_specs.EXAMPLES_KEY],
        model=trainer.outputs[standard_component_specs.MODEL_KEY],
        eval_config=eval_config,
    )

With a simple eval_config without any splits. So we only have the eval_split which is used for evaluation.

To run the TFX pipeline we use the FlinkRunner for beam. The sidecar image is built from tensorflow/tfx:1.8.1.

We run flink with a parellism of 10. So 10 files of tf_records are in input of the evaluator component.

From what we could gather, beam tells flink to build a single task for the 3 p_transforms:

"ReadFromTFRecordToArrow" | "FlattenExamples" | "ExtractEvaluateAndWriteResults"

Our issue is that this ends up creating a single subtask for Flink, so a single task manager is doing all the work as you can see in the attached screenshot. So the issue seems to be with the beam workflow which does not parallelized.

I have two main questions:

Screenshot from 2023-01-26 15-18-44

singhniraj08 commented 1 year ago

@jccarles,

Can you please share the eval_config passed in evaluator component to analyse the root cause of the issue? Thank you!

jccarles commented 1 year ago

Hello ! Thank you for your answer, here is the eval_config used. We used fake very low bounds for testing.

{
  "model_specs": [
    {
      "signature_name": "serving_default",
      "label_key": "label",
      "preprocessing_function_names": [
        "transform_features"
      ]
    }
  ],
  "metrics_specs": [
    {
      "thresholds": {
        "precision": {
          "value_threshold": {
            "lower_bound": 1e-03
          },
          "change_threshold": {
            "absolute": -1e-10,
            "direction": "HIGHER_IS_BETTER"
          }
        }
      }
    }
  ]
}
singhniraj08 commented 1 year ago

@mdreves, Can we dispatch the evaluator between different task managers. Thanks!

jccarles commented 1 year ago

Hello, thank you for checking this issue, did you have time to take a look ? Have you identified anything so far, can I help somehow ?

Enzo90910 commented 1 year ago

This issue is currently preventing us from using the Evaluator component in production, since it makes the memory requirements on a single Flink TaskManager rather huge.