intel-analytics / ipex-llm

Accelerate local LLM inference and finetuning (LLaMA, Mistral, ChatGLM, Qwen, Baichuan, Mixtral, Gemma, Phi, MiniCPM, etc.) on Intel XPU (e.g., local PC with iGPU and NPU, discrete GPU such as Arc, Flex and Max); seamlessly integrate with llama.cpp, Ollama, HuggingFace, LangChain, LlamaIndex, GraphRAG, DeepSpeed, vLLM, FastChat, Axolotl, etc.
Apache License 2.0
6.49k stars 1.24k forks source link

Conf `spark.python.worker.reuse: false` Issue Summary #7761

Open hamham223 opened 1 year ago

hamham223 commented 1 year ago

Problem

Some tensorflow training unit tests require the following extra config in conftest.py, especially for spark backend. Some ray backend uts also use this config.

conf = {"spark.python.worker.reuse": "false"}
sc = init_orca_context(cores=8, conf=conf)

Typical Errors

Removing the above config may cause the following kinds of errors:

FAILED test/bigdl/orca/learn/test_tf2_basic.py::TestTFEstimatorBasic::test_dataframe_shard_size - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(10, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/rdd.py", line 2863, in func
    return f(iterator)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 238, in <lambda>
    lambda iter: transform_func(iter, init_params, params)).collect()
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 235, in transform_func
    return SparkRunner(**init_param).step(**param)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 252, in __init__
    self.setup_distributed(self.cluster)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 317, in setup_distributed
    self.strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/util/deprecation.py", line 357, in new_func
    return func(*args, **kwargs)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 253, in __init__
    self).__init__(cluster_resolver, communication_options)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 188, in __init__
    communication_options=communication_options))
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 329, in __init__
    self._initialize_strategy(self._cluster_resolver)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 341, in _initialize_strategy
    self._initialize_multi_worker(cluster_resolver)
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 468, in _initialize_multi_worker
    device_filters=("/job:%s/task:%d" % (task_type, task_id),))
  File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/eager/context.py", line 882, in configure_collective_ops
    raise ValueError("Collective ops are already configured.")
ValueError: Collective ops are already configured.

```python # continous predicting after_res = est.predict(df, feature_cols=["feature"]).collect() pred_res = np.concatenate([part["prediction"] for part in after_res]) > assert np.array_equal(expect_res, pred_res) E assert False E + where False = (array([0.33034629, 0.50642633, 0.4721778 , 0.35401052, 0.36536425,\n 0.28024089, 0.36466506, 0.22905967, 0.30667904, 0.41859537,\n 0.27126238, 0.27779567, 0.33019286, 0.27564853, 0.38458949,\n 0.20875439, 0.34339929, 0.4605394 , 0.38060537, 0.31551862,\n 0.24109799, 0.27530131, 0.38145477, 0.26889354, 0.3314136 ,\n 0.33034629, 0.50642633, 0.4721778 , 0.35401052, 0.36536425,\n 0.28024089, 0.36466506, 0.22905967, 0.30667904, 0.41859537,\n 0.27126238, 0.27779567, 0.33019286, 0.27564853, 0.38458949,\n 0.20875439, 0.34339929, 0.4605394 , 0.38060537, 0.31551862,\n 0.24109799, 0.27530131, 0.38145477, 0.26889354, 0.3314136 ,\n 0.32420057, 0.24693905, 0.32494292, 0.25303987, 0.30210978,\n 0.35318539, 0.30621374, 0.35859883, 0.29737732, 0.25018737,\n 0.24927917, 0.11859994, 0.26847011, 0.26930273, 0.31998277,\n 0.39892027, 0.24674319, 0.16561867, 0.34938154, 0.33357757,\n 0.3579762 , 0.27510715, 0.25931799, 0.31674844, 0.31037313,\n 0.32420057, 0.24693905, 0.32494292, 0.25303987, 0.30210978,\n 0.35318539, 0.30621374, 0.35859883, 0.29737732, 0.25018737,\n 0.24927917, 0.11859994, 0.26847011, 0.26930273, 0.31998277,\n 0.39892027, 0.24674319, 0.16561867, 0.34938154, 0.33357757,\n 0.3579762 , 0.27510715, 0.25931799, 0.31674844, 0.31037313]), array([0.32420057, 0.24693905, 0.32494292, 0.25303987, 0.30210978,\n 0.35318539, 0.30621374, 0.35859883, 0.29737732, 0.25018737,\n 0.24927917, 0.11859994, 0.26847011, 0.26930273, 0.31998277,\n 0.39892027, 0.24674319, 0.16561867, 0.34938154, 0.33357757,\n 0.3579762 , 0.27510715, 0.25931799, 0.31674844, 0.31037313,\n 0.38191152, 0.3602553 , 0.28796911, 0.27970612, 0.255009 ,\n 0.24886124, 0.35878149, 0.26711881, 0.13288987, 0.3203904 ,\n 0.18340567, 0.2296344 , 0.32722211, 0.43431443, 0.37692329,\n 0.35792196, 0.27677733, 0.25484768, 0.14131568, 0.41106236,\n 0.27757517, 0.44170555, 0.22962368, 0.44884759, 0.32345006,\n 0.33034629, 0.50642633, 0.4721778 , 0.35401052, 0.36536425,\n 0.28024089, 0.36466506, 0.22905967, 0.30667904, 0.41859537,\n 0.27126238, 0.27779567, 0.33019286, 0.27564853, 0.38458949,\n 0.20875439, 0.34339929, 0.4605394 , 0.38060537, 0.31551862,\n 0.24109799, 0.27530131, 0.38145477, 0.26889354, 0.3314136 ,\n 0.33034629, 0.50642633, 0.4721778 , 0.35401052, 0.36536425,\n 0.28024089, 0.36466506, 0.22905967, 0.30667904, 0.41859537,\n 0.27126238, 0.27779567, 0.33019286, 0.27564853, 0.38458949,\n 0.20875439, 0.34339929, 0.4605394 , 0.38060537, 0.31551862,\n 0.24109799, 0.27530131, 0.38145477, 0.26889354, 0.3314136 ])) E + where = np.array_equal ```
```python E py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. E : org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(10, 1) finished unsuccessfully. E org.apache.spark.api.python.PythonException: Traceback (most recent call last): E File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main E process() E File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process E out_iter = func(split_index, iterator) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/rdd.py", line 2863, in func E return f(iterator) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 251, in E lambda iter: transform_func(iter, init_params, params)).collect() E File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 246, in transform_func E return SparkRunner(**init_param).step(**param) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 260, in __init__ E self.model = self.model_creator(self.config) E File "/home/runner/_work/BigDL/BigDL/python/orca/test/bigdl/orca/learn/test_tf2_basic.py", line 52, in model_creator E model = simple_model(config) E File "/home/runner/_work/BigDL/BigDL/python/orca/test/bigdl/orca/learn/test_tf2_basic.py", line 33, in simple_model E tf.keras.layers.Dense(1)]) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/training/tracking/base.py", line 587, in _method_wrapper E result = method(self, *args, **kwargs) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/keras/utils/traceback_utils.py", line 67, in error_handler E raise e.with_traceback(filtered_tb) from None E File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 55, in quick_execute E inputs, attrs, num_outputs) E tensorflow.python.framework.errors_impl.UnavailableError: Collective ops is aborted by: failed to connect to all addresses E Additional GRPC error information from remote target /job:worker/replica:0/task:0: E :{"created":"@1676289817.379653076","description":"Failed to pick subchannel","file":"external/com_github_grpc_grpc/src/core/ext/filters/client_channel/client_channel.cc","file_line":3940,"referenced_errors":[{"created":"@1676289817.374523513","description":"failed to connect to all addresses","file":"external/com_github_grpc_grpc/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":392,"grpc_status":14}]} E The error could be from a previous operation. Restart your program to reset. [Op:CollectiveBcastRecv] ```
```python trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"], label_cols=["label"], validation_data=df, > validation_steps=1) test/bigdl/orca/learn/spark/test_tf2_estimator.py:475: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py:258: in fit lambda iter: transform_func(iter, init_params, params)).collect() /opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/rdd.py:949: in collect sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) /opt/conda/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py:1305: in __call__ answer, self.gateway_client, self.target_id, self.name) /opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py:111: in deco return f(*a, **kw) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ answer = 'xro1252' gateway_client = target_id = 'z:org.apache.spark.api.python.PythonRDD', name = 'collectAndServe' def get_return_value(answer, gateway_client, target_id=None, name=None): """Converts an answer received from the Java gateway into a Python object. For example, string representation of integers are converted to Python integer, string representation of objects are converted to JavaObject instances, etc. :param answer: the string returned by the Java gateway :param gateway_client: the gateway client used to communicate with the Java Gateway. Only necessary if the answer is a reference (e.g., object, list, map) :param target_id: the name of the object from which the answer comes from (e.g., *object1* in `object1.hello()`). Optional. :param name: the name of the member from which the answer comes from (e.g., *hello* in `object1.hello()`). Optional. """ if is_error(answer)[0]: if len(answer) > 1: type = answer[1] value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) if answer[1] == REFERENCE_TYPE: raise Py4JJavaError( "An error occurred while calling {0}{1}{2}.\n". > format(target_id, ".", name), value) E py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. E : org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(88, 0) finished unsuccessfully. E org.apache.spark.api.python.PythonException: Traceback (most recent call last): E File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main E process() E File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process E out_iter = func(split_index, iterator) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/rdd.py", line 2863, in func E return f(iterator) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 258, in E lambda iter: transform_func(iter, init_params, params)).collect() E File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 253, in transform_func E return SparkRunner(**init_param).step(**param) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 226, in __init__ E self.setup() E File "/opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 296, in setup E tf.config.threading.set_intra_op_parallelism_threads(self.intra_op_parallelism) E File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/framework/config.py", line 118, in set_intra_op_parallelism_threads E context.context().intra_op_parallelism_threads = num_threads E File "/opt/conda/envs/py37/lib/python3.7/site-packages/tensorflow/python/eager/context.py", line 1827, in intra_op_parallelism_threads E "Intra op parallelism cannot be modified after initialization.") E RuntimeError: Intra op parallelism cannot be modified after initialization. ```
```python res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"], label_cols=["label"], validation_data=df, > validation_steps=1) test/bigdl/orca/learn/spark/test_tf2_estimator.py:231: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py:186: in fit cluster_info=self._get_cluster_info(sc), /opt/conda/envs/py37/lib/python3.7/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py:111: in _get_cluster_info cluster_info = self.workerRDD.barrier().mapPartitions(find_ip_and_free_port).collect() /opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/rdd.py:949: in collect sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) /opt/conda/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py:1305: in __call__ answer, self.gateway_client, self.target_id, self.name) /opt/conda/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py:111: in deco return f(*a, **kw) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ answer = 'xro1586' gateway_client = target_id = 'z:org.apache.spark.api.python.PythonRDD', name = 'collectAndServe' def get_return_value(answer, gateway_client, target_id=None, name=None): """Converts an answer received from the Java gateway into a Python object. For example, string representation of integers are converted to Python integer, string representation of objects are converted to JavaObject instances, etc. :param answer: the string returned by the Java gateway :param gateway_client: the gateway client used to communicate with the Java Gateway. Only necessary if the answer is a reference (e.g., object, list, map) :param target_id: the name of the object from which the answer comes from (e.g., *object1* in `object1.hello()`). Optional. :param name: the name of the member from which the answer comes from (e.g., *hello* in `object1.hello()`). Optional. """ if is_error(answer)[0]: if len(answer) > 1: type = answer[1] value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) if answer[1] == REFERENCE_TYPE: raise Py4JJavaError( "An error occurred while calling {0}{1}{2}.\n". > format(target_id, ".", name), value) E py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. E : org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 100.0 failed 1 times, most recent failure: Lost task 4.0 in stage 100.0 (TID 627) (localhost executor driver): java.net.SocketException: Socket is closed ```

How-to reproduce the problem

remove the above config in:

and run orca pytests

Current Solutions

hamham223 commented 1 year ago

Update on error 2, array not equal

The 2 predicted array are not equal because the input dataframes are not the same.

        rdd_map = rdd.map(lambda x: (DenseVector(np.random.randn(1, ).astype(np.float32)),
                                int(np.random.randint(0, 2, size=())),
                                os.getpid()))
        df = rdd_map.toDF(["feature", "label", "pid"]) # this is lazy by default

Experiment 1

With the config

    conf = {"spark.python.worker.reuse": "false"}
    sc = init_orca_context(conf=conf)
image

image

Every time a new process is created, with the same random seed, df.collect() always returns the same value, and therefore the predict can give the same result.

            before_res = trainer.predict(df, feature_cols=["feature"]).collect()
            expect_res = np.concatenate([part["prediction"] for part in before_res])

            trainer.load(os.path.join(temp_dir, "cifar10_savemodel"))

            # continous predicting
            after_res = trainer.predict(df, feature_cols=["feature"]).collect()
            pred_res = np.concatenate([part["prediction"] for part in after_res])

            assert np.array_equal(expect_res, pred_res)

Experiement 2

Without the config

    sc = init_orca_context()

image image

Without the config, it will use a fixed number of Python workers (i.e. always produce random number in the same processes), so the dataframe will be different in two predicts and thus the results are not the same.

Reference:

hamham223 commented 1 year ago

So for ray backend tests, as long as we can keep the dataframe being the same, the config is not needed? @sgwhat Could you please look into those uts?

hamham223 commented 1 year ago

https://github.com/intel-analytics/BigDL/pull/7948

This PR can serve as a reference.

Two errors only?

Next Step: classify which ut can arise which kind of error, is it stable?

hamham223 commented 1 year ago

this ut can see the error Collective ops are already configured.

import numpy as np
import tensorflow as tf

from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.tf2 import Estimator
from bigdl.orca import OrcaContext

def simple_model(config):
    model = tf.keras.models.Sequential([tf.keras.layers.Dense(10, input_shape=(1,)),
                                        tf.keras.layers.Dense(1)])
    return model

def multi_output_model(config):
    image_input_1 = tf.keras.Input(shape=(32, 32, 3), name="input_1")
    image_input_2 = tf.keras.Input(shape=(32, 32, 3), name="input_2")

    x1 = tf.keras.layers.Conv2D(3, 3)(image_input_1)
    x1 = tf.keras.layers.GlobalMaxPooling2D()(x1)
    x2 = tf.keras.layers.Conv2D(3, 3)(image_input_2)
    x2 = tf.keras.layers.GlobalMaxPooling2D()(x2)
    x = tf.keras.layers.concatenate([x1, x2])

    score_output = tf.keras.layers.Dense(5, name="score_output")(x)
    class_output = tf.keras.layers.Dense(5, name="class_output")(x)

    model = tf.keras.Model(
        inputs=[image_input_1, image_input_2], outputs=[score_output, class_output]
    )
    return model

def compile_args(config):
    import tensorflow as tf
    if "lr" in config:
        lr = config["lr"]
    else:
        lr = 1e-3
    args = {
        "optimizer": tf.keras.optimizers.SGD(lr),
        "loss": "mean_squared_error",
        "metrics": ["mean_squared_error"]
    }
    return args

def model_creator(config):
    model = simple_model(config)
    model.compile(**compile_args(config))
    return model

def test_dataframe_different_train_val():
    sc = OrcaContext.get_spark_context()
    rdd = sc.range(0, 100, numSlices=10)
    spark = OrcaContext.get_spark_session()
    from pyspark.ml.linalg import DenseVector
    df = rdd.map(lambda x: (DenseVector(np.random.randn(1, ).astype(np.float32)),
                            int(np.random.randint(0, 2, size=())))).toDF(["feature", "label"])
    val_rdd = sc.range(0, 20, numSlices=6)
    val_df = val_rdd.map(lambda x: (DenseVector(np.random.randn(1, ).astype(np.float32)),
                         int(np.random.randint(0, 2, size=())))).toDF(["feature", "label"])
    config = {
        "lr": 0.2
    }
    trainer = Estimator.from_keras(
        model_creator=model_creator,
        verbose=True,
        config=config,
        workers_per_node=2,
        backend="spark")
    res = trainer.fit(df, epochs=1, batch_size=4, steps_per_epoch=25,
                        validation_data=val_df,
                        validation_steps=2,
                        feature_cols=["feature"],
                        label_cols=["label"])
    res = trainer.evaluate(val_df, batch_size=4, num_steps=25, feature_cols=["feature"],
                             label_cols=["label"])
    print("validation result: ", res)
    res = trainer.predict(df, feature_cols=["feature"]).collect()
    print("predict result: ", res)
    trainer.shutdown()

sc = init_orca_context()

test_dataframe_different_train_val()
stop_orca_context()
hamham223 commented 1 year ago

image

ooops! ray estimator are re-using the runner

hamham223 commented 1 year ago

update:

Error Collective Ops are configured seemly is due to task_id and pid is not consist.

Traceback:

spark_runner.py Ln 332:

print("config tf worker in pid: "+str(os.getpid()))
print(os.environ["TF_CONFIG"])
self.strategy = tf.distribute.MultiWorkerMirroredStrategy() # get task_id from "TF_CONFIG"

⬇️ collective_all_reduce_strategy.py Ln 464:

      context.context().configure_collective_ops(
          collective_leader=multi_worker_util.collective_leader(
              cluster_spec, task_type, task_id),
          scoped_allocator_enabled_ops=("CollectiveReduce",),
          device_filters=("/job:%s/task:%d" % (task_type, task_id),))

⬇️ context.py Line 876:

    print("pid: "+str(os.getpid())) # those print are added by me
    print("device filter to config: "+ str(device_filters))
    print("device filter to be configed: "+ str(self._collective_device_filters))
    if self._collective_leader is not None:
      if (self._collective_leader != collective_leader or
          self._collective_scoped_allocator_enabled_ops !=
          scoped_allocator_enabled_ops or
          self._collective_use_nccl_communication != use_nccl_communication or
          self._collective_device_filters != device_filters):
        print(device_filters)
        print(self._collective_device_filters)
        raise ValueError("Collective ops are already configured.")
      else:
        return

first time, (called by trainer.fit)

config tf worker in pid: 2957
{"cluster": {"worker": ["ip:59429", "ip:41471"]}, "task": {"type": "worker", "index": 0}}
pid: 2957 device filter to config: ('/job:worker/task:0',)
pid: 2957 device filter to be configed: None

config tf worker in pid: 2953
{"cluster": {"worker": ["ip:59429", "ip:41471"]}, "task": {"type": "worker", "index": 1}}
pid: 2953 device filter to config: ('/job:worker/task:1',)
pid: 2953 device filter to be configed: None

second time (called by trainer.validate)

config tf worker in pid: 2957                                                   
{"cluster": {"worker": ["ip:41973", "ip:58869"]}, "task": {"type": "worker", "index": 1}}
pid: 2957 device filter to config: ('/job:worker/task:1',)
pid: 2957 device filter to be configed: ('/job:worker/task:0',)