intel-analytics / analytics-zoo

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray
https://analytics-zoo.readthedocs.io/
Apache License 2.0
16 stars 3 forks source link

Executor out of memory with long training #583

Open yatinece opened 3 years ago

yatinece commented 3 years ago

Hi ,

I am trying to train the implementation of Factorization machine using analytics zoo orca estimator. The problem I face is I get the following error when I run it for big epochs

2020-10-22 06:19:25 ERROR YarnClusterScheduler:73 - Lost executor 8 on hdp2stl020386.mastercard.int: Container killed by YARN for exceeding memory limits. 39.0 GB of 39 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714. 2020-10-22 06:19:46 ERROR TaskSetManager:73 - Task 16 in stage 63971.0 failed 4 times; aborting job Traceback (most recent call last): File "digital_standard_score.py", line 311, in est_ck.fit(data=train_df,batch_size=batch_size,epochs=max_epoch,featurecols=[ 'input'+str(k+1) for k in range(ik)],labels_cols=[ target_feature],validation_data= test_df,checkpoint_trigger=EveryEpoch()) File "/dfs/2/yarn/nm/usercache/e074454/appcache/application_1602667222169_23829/container_e20_1602667222169_23829_01_000007/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0-python-api.zip/zoo/orca/learn/tf/estimator.py", line 476, in fit File "/dfs/2/yarn/nm/usercache/e074454/appcache/application_1602667222169_23829/container_e20_1602667222169_23829_01_000007/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0-python-api.zip/zoo/tfpark/tf_optimizer.py", line 744, in optimize File "/dfs/2/yarn/nm/usercache/e074454/appcache/application_1602667222169_23829/container_e20_1602667222169_23829_01_000007/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0-python-api.zip/zoo/pipeline/estimator/estimator.py", line 168, in train_minibatch File "/dfs/2/yarn/nm/usercache/e074454/appcache/application_1602667222169_23829/container_e20_1602667222169_23829_01_000007/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0-python-api.zip/zoo/common/utils.py", line 133, in callZooFunc File "/dfs/2/yarn/nm/usercache/e074454/appcache/application_1602667222169_23829/container_e20_1602667222169_23829_01_000007/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0-python-api.zip/zoo/common/utils.py", line 127, in callZooFunc File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4204.5870739/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4204.5870739/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/opt/cloudera/parcels/CDH-6.3.3-1.cdh6.3.3.p4204.5870739/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o118.estimatorTrainMiniBatch. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 16 in stage 63971.0 failed 4 times, most recent failure: Lost task 16.3 in stage 63971.0 (TID 639718, hdp2stl020395.mastercard.int, executor 21): java.util.concurrent.ExecutionException: java.lang.RuntimeException: Didn't find weight block test_0weightBytes14 in the block manager. Did you initialize this AllReduceParameter on every executor?

  at java.util.concurrent.FutureTask.report(FutureTask.java:122)

This is the discussion on the error in analytics zoo :- https://github.com/intel-analytics/analytics-zoo-internal/issues/860 which say this is a memory issue. I have increased the executor memory and reduce the batch size, but it will delay the memory exception. If it was failing in 2 epoch, now it will fail in 3rd epoch.

Is it an issue that the job keeps on capturing more memory as it runs?. Can we do something to reduce memory consumption?.

qiyuangong commented 3 years ago

Hi, your application hit memory limitation set for Yarn container. If this memory usage is correct, then you can increase max container memory on Yarn or remove memory limitation check. If it used more memory than expected, there maybe memory leak or other exceptions.

@yangw1234 for TFpark related memory consumption.

yatinece commented 3 years ago

@qiyuangong , I think the container is taking a lot more memory than expected. I think it is not clearing the old GC when training is running and hence it run's out of memory. Any way I can test this or solve this?

qiyuangong commented 3 years ago

@qiyuangong , I think the container is taking a lot more memory than expected. I think it is not clearing the old GC when training is running and hence it run's out of memory. Any way I can test this or solve this?

Hi @yatinece . I don't think Yarn container took too much memory. Yarn container is only a resource unit for application. According to my experience, your training application is trying to consume more memory than it applied from Yarn, i.e., 39GB. So, Nodemanager killed this application by killing this container.

yatinece commented 3 years ago

@qiyuangong : Thanks, I increased the container memory to 79G also. We have a cap of 80GB, the only thing changed was that the model trained for 1 more epoch before the same error. So I think continuous training keep on increasing memory requirement. New to this , so may be using very generic terms.

jason-dai commented 3 years ago

@yatinece I wonder if there is a way for you to provide some sample code as well as the parameters (e.g., model size, data size, job parameters and configurations, etc.), so that we can try to replicate it on our side to understand why it needs large memory size.

In addition, an alternative is to try to restart the training from check-point if the job fails (instead of from scratch). @yangw1234 please provide some examples on how to do this.

yatinece commented 3 years ago

@jason-dai Following are details. Train data has 12.4 M rows.

Some parts of code @yangw1234 : if you can share the checkpoint logic or suggest another thing, I can test that as well.

############Library################
import argparse
import csv
import os
import time
import datetime
import numpy as np
import importlib
import tensorflow as tf
import pandas as pd
import pyspark.sql.functions as F
from bigdl.optim.optimizer import TrainSummary, ValidationSummary
from bigdl.optim.optimizer import *
from bigdl.optim.optimizer import *
from pyspark.sql.functions import when
from bigdl.util.common import init_engine
from bigdl.util.engine import prepare_env
from pyspark.sql import HiveContext, Row, SparkSession
from pyspark.sql.functions import concat, col, udf, lit,to_timestamp
from pyspark.sql.types import FloatType,DoubleType,ArrayType, IntegerType
from pyspark.ml.feature import StringIndexer,VectorAssembler,CountVectorizer
from zoo import init_nncontext, init_spark_conf
from zoo.orca.learn.tf.estimator import Estimator
from zoo.tfpark import TFOptimizer,KerasModel, TFDataset, TFPredictor

from bigdl.optim.optimizer import MaxEpoch, EveryEpoch
from zoo.pipeline.api.keras.metrics import Accuracy, BinaryAccuracy

from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd
import numpy as np
import os
import time
import sys

from zoo.tfpark import KerasModel, TFDataset, TFPredictor
import tensorflow as tf
from keras import backend as K

############Embedding shape################
dict_size={'Feature1': (68, 34),
 'Feature2': (3, 2),
 'Feature3': (7, 4),
 'Feature4': (74, 37),
 'Feature5': (14, 7),
 'Feature6': (37, 19),
 'Feature7': (2, 1),
 'Feature8': (2, 1),
 'Feature9': (11, 6),
 'Feature10': (5, 3),
 'Feature11': (13, 7),
 'Feature12': (2, 1),
 'Feature13': (41, 21),
 'Feature14': (6, 3),
 'Feature15': (27, 14),
 'Feature16': (7, 4),
 'Feature17': (372, 50),
 'Feature18': (6690, 50),
 'Feature19': (3, 2),
 'Feature20': (3, 2),
 'Feature21': (2, 1),
 'Feature22': (3, 2),
 'Feature23': (2, 1),
 'Feature24': (2, 1),
 'Feature25': (4, 2),
 'Feature26': (2, 1),
 'Feature27': (2, 1),
 'Feature28': (2, 1)}
features = ['Feature1', 'Feature2',
'Feature3', 'Feature4', 'Feature5', 'Feature6','Feature7', 
'Feature8', 'Feature9', 'Feature10', 
 'Feature11', 'Feature12','Feature13','Feature14',
'Feature15','Feature16','Feature17', 
 'Feature18',             'Feature19','Feature20',
            'Feature21','Feature22','Feature23','Feature24','Feature25', 
'Feature26','Feature27' ,'Feature28']

dense_features =  ['Feature29','Feature30']

target_feature= 'label'
############model################
def create_model( catcols, dense_features_n, m):    
    inputs = []
    outputs = []
    outputs_dense = []
    categorical_output = []
    layers_dict={}
    iic=0
    for c in catcols:
        iic=iic+1
        num_unique_values,embed_dim = dict_size[c]
        layers_dict[c] = layers.Input(shape=(1,),name='input_'+str(iic))

        out = layers.Embedding(num_unique_values + 1, embed_dim)(layers_dict[c])
        out = layers.SpatialDropout1D(0.3)(out)
        out = layers.Reshape(target_shape=(embed_dim, ))(out)

        out_c = layers.Embedding(num_unique_values + 1, m)(layers_dict[c])
        out_c = layers.SpatialDropout1D(0.3)(out_c)
        out_c = layers.Reshape(target_shape=(m, ))(out_c)

        inputs.append(layers_dict[c])
        outputs.append(out)
        categorical_output.append(out_c)

    for c in (dense_features_n):
        iic=iic+1
        layers_dict[c] = layers.Input(shape=(1,),name='input_'+str(iic))
        inputs.append(layers_dict[c])
        outputs.append(layers_dict[c])

    x = layers.Concatenate()(outputs)
    x = layers.BatchNormalization()(x)

    x = layers.Dense(256, activation="relu")(x)
    x = layers.Dropout(0.3)(x)
    x = layers.BatchNormalization()(x)

    x = layers.Dense(256, activation="relu")(x)
    x = layers.Dropout(0.3)(x)
    x = layers.BatchNormalization()(x)

    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(0.3)(x)
    x = layers.BatchNormalization()(x)

    deep_out = layers.Dropout(0.15)(x)

    first_order = [emb1 for emb1 in categorical_output]
    second_order = []

    for emb1, emb2 in combinations(categorical_output, 2):
        dot_layer = dot([emb1,  emb2], axes=1, normalize=False)
        second_order.append(dot_layer)

    first_order  = Add()(first_order)
    second_order = Add()(second_order)

    total_out = layers.Concatenate()([deep_out,first_order,second_order])
    total_out = layers.Dense(100, activation="relu")(total_out)
    y = layers.Dense(1, activation="sigmoid")(total_out)

    model = Model(inputs=inputs, outputs=y)
    model.compile(loss= 'binary_crossentropy' , optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5))

    return model

############Model size################

#Total params: 585,839
#Trainable params: 583,999
#Non-trainable params: 1,840

############Read data################
train_df=spark.read.parquet(loc)
test_df=spark.read.parquet(loc)

train_df=train_df.select(features+dense_features+[ target_feature])
test_df=test_df.select(features+dense_features+[ target_feature])

from pyspark.sql.functions import col

train_df=train_df.select(*(col(c).cast("float").alias(c) for c in train_df.columns))
test_df=test_df.select(*(col(c).cast("float").alias(c) for c in test_df.columns))

from pyspark.ml.feature import VectorAssembler
features_list=features+dense_features
ik=0
for col in features_list:
    ik=ik+1
    print(str(ik)+ ' print for vector')
    vectorAssembler_input_2 = VectorAssembler(inputCols=[col], outputCol='input_'+str(ik))
    train_df = vectorAssembler_input_2.transform(train_df)
ik=0
for col in features_list:
    ik=ik+1
    print(str(ik)+ ' print for vector')
    vectorAssembler_input_2 = VectorAssembler(inputCols=[col], outputCol='input_'+str(ik))
    test_df = vectorAssembler_input_2.transform(test_df)

print("start modeling train")

print("model name is "+model_name)

# create instance of model
print("remote model save path is "+ save_to_remote_dir)
print("*"*80)
os.system('hdfs dfs -rm -r ' + str(Final_model_save_to_remote_dir))
print(create_model( features, dense_features, 5).summary())
est_ck = Estimator.from_keras(create_model( features, dense_features, 5),model_dir=save_to_remote_dir)

est_ck.fit(data=train_df,batch_size=batch_size,epochs=max_epoch,feature_cols=[ 'input_'+str(k+1) for k in range(ik)],labels_cols=[ target_feature],validation_data= test_df,checkpoint_trigger=EveryEpoch())
yangw1234 commented 3 years ago

hi @yatinece here is an example on loading checkpoint

https://github.com/intel-analytics/analytics-zoo/blob/master/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py#L186

yatinece commented 3 years ago

@yangw1234 : Let me try and let you know. Any other suggestion on why memory is being occupied?

yangw1234 commented 3 years ago

@yatinece we are still looking into the issue. Would you mind sharing your application resource configuration, like the number of nodes, the number of cores per node, node memory, etc?

yangw1234 commented 3 years ago

@yatinece it seems the problem is caused by a memory leak issue in intel optimized tensorflow. Can you try setting the TF_DISABLE_MKL environment variable to "1" on each spark exeuctor, to disable it?

Here is the code if you are using init_orca_context

init_orca_context(..., conf={"spark.executorEnv.TF_DISABLE_MKL": "1"})
yatinece commented 3 years ago

@yangw1234 : Thanks for this. i am using this to initiate the session :- ( will update once the job runs)

    sparkConf = init_spark_conf()
    sc = init_nncontext(sparkConf)

    spark = SparkSession \
    .builder \
    .appName(app_name) \
    .getOrCreate()

    init_engine()

I have now changed init_spark_conf() to init_spark_conf(conf={"spark.executorEnv.TF_DISABLE_MKL": "1"})

i have been using following settings. i am changing num-executors ,batch size and XX:ParallelGCThreads to see , if it can run .

spark-submit \ --conf spark.yarn.dist.archives=hdfs://xxx/venv.zip \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=venv.zip/venv/bin/python \ --conf spark.pyspark.python=venv.zip/venv/bin/python \ --conf spark.pyspark.driver.python=venv.zip/venv/bin/python \ --conf spark.executor.memoryOverhead=6096 \ --conf spark.driver.memoryOverhead=6096 \ --master yarn \ --deploy-mode cluster \ --conf spark.port.maxRetries=100 \ --conf spark.yarn.executor.memoryOverhead=5000000000 \ --executor-memory 33g \ --driver-memory 33g \ --executor-cores 2 \ --num-executors 160 \ --queue root.user.yyy \ --conf spark.executor.extraJavaOptions="-Xss512m" \ --conf spark.driver.extraJavaOptions="-Xss512m" \ --driver-java-options "-XX:MaxPermSize=2G -XX:+UseG1GC" \ --conf spark.driver.extraJavaOptions="-XX:ParallelGCThreads=28 -XX:+UseParallelGC -XX:+UseParallelOldGC " \ --conf spark.executor.extraJavaOptions="-XX:ParallelGCThreads=28 -XX:+UseParallelGC -XX:+UseParallelOldGC" \ --conf spark.sql.catalogImplementation=hive \ --conf spark.sql.shuffle.partitions=200 \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.ui.showConsoleProgress=true \ --conf spark.yarn.maxAppAttempts=1 \ --conf spark.locality.wait=2s \ --conf spark.driver.maxResultSize=10g \ --conf spark.sql.parquet.binaryAsString=true \ --conf spark.shuffle.consolidateFiles=false \ --conf spark.rdd.compress=true \ --conf spark.stage.maxConsecutiveAttempts=30 \ --jars hdfs://xxx/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0.jar \ --files hdfs://xxx/spark-analytics-zoo.conf \ --py-files hdfs://xxx/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0-python-api.zip hdfs://xxx4/abc_v1.py

yatinece commented 3 years ago

@yangw1234 :- The suggested setting is working. Thanks for this.

For checkpoint suggestion:- i implemented a code like this : -

    max_epoch=15
    step=5
    for k in range(max_epoch//step): 

        tf.reset_default_graph()
        model=create_model( features, dense_features, 5)
        est_ck = Estimator.from_keras(keras_model=model,model_dir=save_to_remote_dir)
        if k >0 : est_ck.load_latest_orca_checkpoint(save_to_remote_dir)
        est_ck.fit(data=train_df,batch_size=batch_size,epochs=step,feature_cols=[ 'input_'+str(k+1) for k in range(ik)],labels_cols=[ target_feature],validation_data= test_df,checkpoint_trigger=SeveralIteration(4))
        eval_result = est_ck.evaluate(test_df,feature_cols=[ 'input_'+str(k+1) for k in range(ik)],labels_cols=[ target_feature],hard_code_batch_size=False)
        print(eval_result)
        print('scoring the model' +"_epoch_"+str(k))

output :-

{'loss': 0.5859652161598206}
scoring the model_epoch_0
Traceback (most recent call last):
  File "xxxx.py", line 323, in <module>
    if k >0 : est_ck.load_latest_orca_checkpoint(save_to_remote_dir)
  File "/xxxx/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.9.0-python-api.zip/zoo/orca/learn/tf/estimator.py", line 45, in load_latest_orca_checkpoint
Exception: Cannot find checkpoint

It is not saving the checkpoint to be loaded. Do you see any mistake : -

yangw1234 commented 3 years ago

@yatinece Sorry for the late response. I was on leave for the last few days. The save_to_remte_dir variable, is it a remote path like “hdfs://xxxx” or a local file system path? load_latest_orca_checkpoint does not support "hdfs://" yet, we are currently working on it.

yatinece commented 3 years ago

@yangw1234 :- I am running the codes via cluster mode (--deploy-mode cluster)in spark. So I think in this case the checkpoint will not work. Do I need to move this to client mode to save the file in the local system?

yangw1234 commented 3 years ago

@yatinece we are working on the hdfs support https://github.com/intel-analytics/analytics-zoo/pull/3091

helenlly commented 3 years ago

@yangw1234 since intel-analytics/analytics-zoo#3091 is already merged, any suggestion for next step?

yangw1234 commented 3 years ago

this issue has been fixed

jason-dai commented 3 years ago

@yangw1234 @jenniew This is the previous memory leak issue; is it fixed?

jenniew commented 3 years ago

I'm still verifying if the intel mkl patch can fix memory leak issue.