keras-team / keras

Deep Learning for humans
http://keras.io/
Apache License 2.0
61.89k stars 19.45k forks source link

Tensorflow MultiWorkerMirroredStrategy and input dictionaries #20329

Open dryglicki opened 1 week ago

dryglicki commented 1 week ago

Hello. I wonder if I've stumbled on another corner case. Re-producing in code is going to be challenging for me, but I may as well give it a shot with enough pieces.

Versions

Keras: 3.5.0 Tensorflow: 2.17.0

Environment

Slurm HPC

Situation

I am trying to use MirroredStrategy and MultiWorkerMirroredStrategy for parallel runs on an HPC that uses Slurm as the scheduler. The if-block that decides the strategy looks like this:

    if DISTRIBUTED:
        if nodes > 1:
            print(f"Number of nodes: {nodes}. MultiWorker")
            cluster_resolver = tf.distribute.cluster_resolver.SlurmClusterResolver()
            mirr_strat = tf.distribute.MultiWorkerMirroredStrategy(cluster_resolver = cluster_resolver)
        else:
            print(f"Number of nodes: {nodes}. MirroredStrategy only.")
            mirr_strat = tf.distribute.MirroredStrategy()
        num_gpu = mirr_strat.num_replicas_in_sync
    else:
        num_gpu = 1

    if num_gpu == 0: num_gpu = 1

I use a PyDataset class to get the data into the model. The return from that class is:

    def __getitem__(self,
            idx: int):

        if idx >= self.__len__(): raise StopIteration

        low = idx * self.batch_size

        high = min(low + self.batch_size, self.tmplen)

        inputs, outputs = self._extract_data_from_hdf5(self.file_list[low:high])

        return [inputs, outputs]

[inputs] is a dictionary, and this is where I'm running into trouble. I want to be clear about this: single node, multi-gpu with MirroredStrategy works just fine; serial works just fine. In the Slurm submission script, the job is being run like this:

srun python3 ${runScript} ${CFGTRAINDIR}/${yamlFile} --nodes ${SLURM_NNODES}

This job will fail with this error (that I've truncated):

    history = model.fit(
              ^^^^^^^^^^
  File "/anvil/projects/pur230005/apps/miniforge3_03_2024/envs/tensorflow_2d17/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 122, in error_handler
    raise e.with_traceback(filtered_tb) from None
  File "/anvil/projects/pur230005/apps/miniforge3_03_2024/envs/tensorflow_2d17/lib/python3.11/site-packages/tensorflow/python/framework/constant_op.py", line 108, in convert_to_eager_tensor
    return ops.EagerTensor(value, ctx.device_name, dtype)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Attempt to convert a value ({'input_priors': PerReplica:{
  0: <tf.Tensor: shape=(6, 6, 128, 128, 9), dtype=float32, numpy=
array([[[[[0.74294865, 0.742778  , 0.74048847, ..., 0.7334533 ,
           0.73980963, 0.7356777 ],
          [0.7488563 , 0.7481118 , 0.74497646, ..., 0.7395892 ,
           0.7464062 , 0.74162275],
          [0.7500703 , 0.75177586, 0.7484259 , ..., 0.74356276,
           0.7494963 , 0.746145  ],
          ...,
          [ 2.6311924 ,  0.90699947,  0.22517234, ...,  0.88064986,
            1.1493572 ,  1.9374763 ],
          [ 2.634326  ,  0.89669716,  0.22517234, ...,  0.8837486 ,
            1.1522278 ,  1.9372374 ],
          [ 2.637458  ,  0.88639283,  0.22517234, ...,  0.8868496 ,
            1.1550914 ,  1.9369978 ]]]]], dtype=float32)>
}}) with an unsupported type (<class 'dict'>) to a Tensor.
    raise e.with_traceback(filtered_tb) from None

The function itself looks like this, if I'm tracing back correctly:

def convert_to_eager_tensor(value, ctx, dtype=None) -> ops._EagerTensorBase:
  """Converts the given `value` to an `EagerTensor`.

  Note that this function could return cached copies of created constants for
  performance reasons.

  Args:
    value: value to convert to EagerTensor.
    ctx: value of context.context().
    dtype: optional desired dtype of the converted EagerTensor.

  Returns:
    EagerTensor created from value.

  Raises:
    TypeError: if `dtype` is not compatible with the type of t.
  """
  if isinstance(value, np.ndarray):
    # Make a copy explicitly because the EagerTensor might share the underlying
    # memory with the input array. Without this copy, users will be able to
    # modify the EagerTensor after its creation by changing the input array.
    value = value.copy()
  if isinstance(value, ops.EagerTensor):
    if dtype is not None and value.dtype != dtype:
      raise TypeError(f"Expected tensor {value} with dtype {dtype!r}, but got "
                      f"dtype {value.dtype!r}.")
    return value
  if dtype is not None:
    try:
      dtype = dtype.as_datatype_enum
    except AttributeError:
      dtype = dtypes.as_dtype(dtype).as_datatype_enum
  ctx.ensure_initialized()
  return ops.EagerTensor(value, ctx.device_name, dtype)

So what appears to be happening is that I have a nested dictionary here, and MultiWorkerMirroredStrategy is adding PerReplica as a container.

I know you've all said that you aren't supporting nested dictionaries or lists (I can't recall the specifics), but what am I supposed to do here?

As an addendum, if this is TF and not Keras issue, please let me know.

fchollet commented 1 week ago

This job will fail with this error (that I've truncated):

Please disable traceback filtering (keras.config.disable_traceback_filtering()) and rerun. I'd like to see where exactly this fails

dryglicki commented 1 week ago

@fchollet Will do. My response time will be a little slow, since we've been affected by Milton and had to evacuate. Just submitted the job on the HPC.

dryglicki commented 5 days ago

@fchollet I finally have error output, after two days of sitting in the queue + hurricane travel. It is still failing with the same error: dict type conversion. I put K.config.disable_traceback_filtering() at the top after the imports but before def main(), if that matters at all.

Error output