Open ssubbayya opened 1 year ago
@ssubbayya this looks like a warning msg .is there an error stack after this msg? or training starts and finishes? @edknv do you mind to put your insight here?
@ssubbayya Can you please share more information on how you arrived at that warning? A minimal reproducible code would be great. I'm particularly confused because you are using the merlin-pytorch
image but Merlin Models doesn't have pytorch support yet.
@ssubbayya I assumed you were using merlin-tensorflow:22.12
image? if you want to use Merlin Model it currently only supports Tensorflow, as @edknv mentioned.
Sorry, I am using nvcr.io/nvidia/merlin/merlin-tensorflow:22.12
I am trying to run the following code; I have 4 different .parquet files. %%writefile './tf_trainer.py'
import os
MPI_SIZE = int(os.getenv("OMPI_COMM_WORLD_SIZE")) MPI_RANK = int(os.getenv("OMPI_COMM_WORLD_RANK"))
os.environ["CUDA_VISIBLE_DEVICES"] = str(MPI_RANK)
import nvtabular as nvt from nvtabular.ops import *
from merlin.models.utils.example_utils import workflow_fit_transform from merlin.schema.tags import Tags
import merlin.models.tf as mm from merlin.io.dataset import Dataset import tensorflow as tf
import argparse
parser = argparse.ArgumentParser( description='Hyperparameters for model training' ) parser.add_argument( '--batch-size', type=str, help='Batch-Size per GPU worker' ) parser.add_argument( '--path', type=str, help='Directory with training and validation data' ) args = parser.parse_args()
train = Dataset(os.path.join(args.path, "train", "part_" + str(MPIRANK) + ".parquet")) valid = Dataset(os.path.join(args.path, "valid", "part" + str(MPI_RANK) + ".parquet"))
target_column = train.schema.select_by_tag(Tags.TARGET).column_names[0]
train_loader = mm.Loader( train, schema=train.schema, batch_size=int(args.batch_size), shuffle=True, drop_last=True, )
valid_loader = mm.Loader( valid, schema=valid.schema, batch_size=int(args.batch_size), shuffle=False, drop_last=True, )
print("Number batches: " + str(len(train_loader)))
model = mm.DLRMModel( train.schema, embedding_dim=16, bottom_block=mm.MLPBlock([32, 16]), top_block=mm.MLPBlock([32, 16]), prediction_tasks=mm.BinaryOutput(target_column), )
opt = tf.keras.optimizers.Adagrad(learning_rate=0.01) model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()]) losses = model.fit( train_loader )
print(model.evaluate(valid, batch_size=int(args.batch_size), return_dict=True)) horovodrun -np 4 python tf_trainer.py --batch-size 16834 --path output
Hello @ssubbayya , thanks for reporting the bug. You are correct. I found a workaround that it will train:
You need to:
train_loader = mm.Loader(
train,
schema=train.schema,
batch_size=int(args.batch_size),
shuffle=True,
drop_last=True,
global_size=1,
global_rank=0,
)
valid_loader = mm.Loader(
valid,
schema=valid.schema,
batch_size=int(args.batch_size),
shuffle=False,
drop_last=True,
global_size=1,
global_rank=0,
)
train_loader.global_rank = 0
valid_loader.global_rank = 0
print("Number batches: " + str(len(train_loader)))
model = mm.DLRMModel(
train.schema,
embedding_dim=16,
bottom_block=mm.MLPBlock([32, 16]),
top_block=mm.MLPBlock([32, 16]),
prediction_tasks=mm.BinaryOutput(target_column),
)
opt = tf.keras.optimizers.Adagrad(learning_rate=0.01)
model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()])
losses = model.fit(
train_loader
)
print(model.evaluate(valid_loader, batch_size=int(args.batch_size), return_dict=True))```
mpirun detected that one or more processes exited with non-zero status, thus causing the job to be terminated. The first process to do so was:
train = Dataset(os.path.join(args.path, "train", "part_" + str(MPIRANK) + ".parquet")) valid = Dataset(os.path.join(args.path, "valid", "part" + str(MPI_RANK) + ".parquet"))
Can you try to add part_size parameter to the Dataset above?
Dataset(os.path.join(args.path, "train", "part_" + str(MPI_RANK) + ".parquet"), part_size='100MB') (or 300MB or 500MB)?
@ssubbayya please also add these lines at the very beginning of your code/notebook:
import os
os.environ["TF_GPU_ALLOCATOR"]="cuda_malloc_async"
@rnyak Thanks! It worked. Now the out of memory error is gone. Now, I get the following error.
mpirun detected that one or more processes exited with non-zero status, thus causing the job to be terminated. The first process to do so was:
@ssubbayya
ValueError: None values not supported.
sounds that the dataset contains NaN values / None values, is that correct?
You should be able to test it like this Dataset().to_ddf().isna().sum().compute()
Can you fill Nan values?
It seems that the dataset is not balanced:
[1,1]:Number batches: 53 [1,0]:Number batches: 53 [1,3]:Number batches: 47 [1,2]:Number batches: 53
Worker 3 has only 47 batches and the other ones have 53. I think that will be another problem after the Nan values
@rnyak
The same dataset works well for pytorch. I checked it does not have Nan. I think some function is returning None value.
I do not know how to fix the unequal number of batches. That is the way nvtabular partitions the data.
@rnyak print(train.to_ddf().isna().sum().compute().sum()) print(train1.to_ddf().isna().sum().compute().sum()) print(train2.to_ddf().isna().sum().compute().sum()) print(train3.to_ddf().isna().sum().compute().sum()) All returned zero. It looks like some of the tensorflow/nvidia functions might have indenting issues. If there are indenting issues it might return none.
Bug description
Steps/Code to reproduce bug
Expected behavior
Environment details. merlin-pytorch:22.12” image.
Additional context