intel-analytics / analytics-zoo

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray
https://analytics-zoo.readthedocs.io/
Apache License 2.0
17 stars 4 forks source link

java.lang.StackOverflowError rasied when using orca bigdl backend #435

Closed leonardozcm closed 3 years ago

leonardozcm commented 3 years ago

py4j.protocol.Py4JJavaError: An error occurred while calling o51.estimatorTrain.: java.lang.StackOverflowError occured when I was trying to immigrate Attention into orca.

Traceback (most recent call last):
  File "/home/arda/PycharmProjects/inception/main.py", line 93, in <module>
    epochs=1)
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/orca/learn/bigdl/estimator.py", line 150, in fit
    val_feature_set, validation_metrics, batch_size)
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/pipeline/estimator/estimator.py", line 146, in train
    validation_method, batch_size)
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/common/utils.py", line 133, in callZooFunc
    raise e
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/common/utils.py", line 127, in callZooFunc
    java_result = api(*args)
  File "/home/arda/Public/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/arda/Public/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.estimatorTrain.
: java.lang.StackOverflowError
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1841)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        .....
leonardozcm commented 3 years ago
from zoo.pipeline.api.keras.models import Model
from zoo.pipeline.api.keras.layers import *

import argparse
import numpy as np
from tensorflow.python.keras.datasets import imdb
from tensorflow.python.keras.preprocessing import sequence
from zoo.orca import init_orca_context, stop_orca_context

from zoo.orca.learn.bigdl.estimator import Estimator
from zoo.orca.data import XShards

parser = argparse.ArgumentParser()
parser.add_argument('--cluster_mode', type=str, default="local",
                    help='The mode for the Spark cluster. local or yarn.')
args = parser.parse_args()
cluster_mode = args.cluster_mode
sc = None
if cluster_mode == "local":
    sc = init_orca_context(cluster_mode="local", cores=4, memory="10g")
elif cluster_mode == "yarn":
    sc = init_orca_context(cluster_mode="yarn-client", num_nodes=2, cores=2, driver_memory="3g",
                           # conf={"spark.executor.extraJavaOptions": "-Xss512m",
                           #       "spark.driver.extraJavaOptions": "-Xss512m"}
                           )

max_features = 20000
max_len = 200

print('Loading data...')
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')

print('Pad sequences (samples x time)')
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)

train_pos = np.zeros((len(x_train), max_len), dtype=np.int32)
val_pos = np.zeros((len(x_test), max_len), dtype=np.int32)
for i in range(0, len(x_train)):
    train_pos[i, :] = np.arange(max_len)
    val_pos[i, :] = np.arange(max_len)

train_pos = np.zeros((len(x_train), max_len), dtype=np.int32)
val_pos = np.zeros((len(x_test), max_len), dtype=np.int32)
for i in range(0, len(x_train)):
    train_pos[i, :] = np.arange(max_len)
    val_pos[i, :] = np.arange(max_len)

# def build_sample(token_id, position_id, label):
#     samples = []
#     for i in range(label.shape[0]):
#         sample = Sample.from_ndarray([token_id[i], position_id[i]], np.array(label[i]))
#         samples.append(sample)
#     return samples
# 
# 
# train_samples = build_sample(x_train, train_pos, y_train)
# val_samples = build_sample(x_test, val_pos, y_test)
# train_rdd = sc.parallelize(train_samples)
# val_rdd = sc.parallelize(val_samples)

train_dataset = XShards.partition({"x": (x_train, train_pos), "y": np.array(y_train)})
val_dataset = XShards.partition({"x": (x_test, val_pos), "y": np.array(y_test)})

token_shape = (max_len,)
position_shape = (max_len,)
token_input = Input(shape=token_shape)
position_input = Input(shape=position_shape)
O_seq = TransformerLayer.init(
    vocab=max_features, hidden_size=128, n_head=8, seq_len=max_len)([token_input, position_input])
# Select the first output of the Transformer. The second is the pooled output.
O_seq = SelectTable(0)(O_seq)
O_seq = GlobalAveragePooling1D()(O_seq)
O_seq = Dropout(0.2)(O_seq)
outputs = Dense(2, activation='softmax')(O_seq)

model = Model([token_input, position_input], outputs).to_model()
model.summary()

model.compile(optimizer=Adam(),
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])

batch_size = 128
print('Train...')

est = Estimator.from_bigdl(model=model)
est.fit(data=train_dataset,
        batch_size=batch_size,
        epochs=1)
print("Train finished.")

print('Evaluating...')
result = est.evaluate(val_dataset)
print(result)

print("finished...")
stop_orca_context()
yangw1234 commented 3 years ago

After removing the TranformerLayer, it works. I think it is related to the TransformerLayer.

@dding3 is there something special about this TransformerLayer?

@jason-dai This example use zoo.pipeline.api.keras.models. Do you prefer still using this or use tf.keras?

from zoo.pipeline.api.keras.models import Model
from zoo.pipeline.api.keras.layers import *
from zoo.pipeline.api.keras.objectives import SparseCategoricalCrossEntropy

import argparse
import numpy as np
from tensorflow.python.keras.datasets import imdb
from tensorflow.python.keras.preprocessing import sequence
from zoo.orca import init_orca_context, stop_orca_context

from zoo.orca.learn.bigdl.estimator import Estimator
from zoo.orca.data import XShards

parser = argparse.ArgumentParser()
parser.add_argument('--cluster_mode', type=str, default="local",
                    help='The mode for the Spark cluster. local or yarn.')
args = parser.parse_args()
cluster_mode = args.cluster_mode
sc = None
if cluster_mode == "local":
    sc = init_orca_context(cluster_mode="local", cores=4, memory="10g")
elif cluster_mode == "yarn":
    sc = init_orca_context(cluster_mode="yarn-client", num_nodes=2, cores=2, driver_memory="3g",
                           # conf={"spark.executor.extraJavaOptions": "-Xss512m",
                           #       "spark.driver.extraJavaOptions": "-Xss512m"}
                           )

max_features = 20000
max_len = 200

print('Loading data...')
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')

print('Pad sequences (samples x time)')
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)

train_pos = np.zeros((len(x_train), max_len), dtype=np.int32)
val_pos = np.zeros((len(x_test), max_len), dtype=np.int32)
for i in range(0, len(x_train)):
    train_pos[i, :] = np.arange(max_len)
    val_pos[i, :] = np.arange(max_len)

train_pos = np.zeros((len(x_train), max_len), dtype=np.int32)
val_pos = np.zeros((len(x_test), max_len), dtype=np.int32)
for i in range(0, len(x_train)):
    train_pos[i, :] = np.arange(max_len)
    val_pos[i, :] = np.arange(max_len)

train_dataset = XShards.partition({"x": (x_train, train_pos), "y": np.array(y_train)})
val_dataset = XShards.partition({"x": (x_test, val_pos), "y": np.array(y_test)})

token_shape = (max_len,)
position_shape = (max_len,)
token_input = Input(shape=token_shape)
position_input = Input(shape=position_shape)
# O_seq = TransformerLayer.init(
#     vocab=max_features, hidden_size=128, n_head=8, seq_len=max_len)([token_input, position_input])
# Select the first output of the Transformer. The second is the pooled output.
# O_seq = SelectTable(0)(O_seq)
# O_seq = GlobalAveragePooling1D()(O_seq)
# O_seq = Dropout(0.2)(O_seq)
O_seq = merge([token_input, position_input], mode="concat")
outputs = Dense(2, activation='softmax')(O_seq)

model = Model([token_input, position_input], outputs)
batch_size = 128
print('Train...')

est = Estimator.from_bigdl(model=model, loss=SparseCategoricalCrossEntropy(), optimizer=Adam())
est.fit(data=train_dataset,
        batch_size=batch_size,
        epochs=1)
print("Train finished.")

print('Evaluating...')
result = est.evaluate(val_dataset)
print(result)

print("finished...")
stop_orca_context()
jason-dai commented 3 years ago

I think you need to increase the Java stack size to avoid avoid StackOverflow exception; see https://github.com/intel-analytics/analytics-zoo/blob/master/pyzoo/zoo/examples/attention/README.md

leonardozcm commented 3 years ago

I think you need to increase the Java stack size to avoid avoid StackOverflow exception; see https://github.com/intel-analytics/analytics-zoo/blob/master/pyzoo/zoo/examples/attention/README.md

Yes, it is caused by java stack size. thanks a lot

leonardozcm commented 3 years ago

After stack issue fixed, another error comes out like:

Traceback (most recent call last):
  File "/home/arda/PycharmProjects/inception/main.py", line 77, in <module>
    epochs=1)
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/orca/learn/bigdl/estimator.py", line 151, in fit
    val_feature_set, validation_metrics, batch_size)
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/pipeline/estimator/estimator.py", line 146, in train
    validation_method, batch_size)
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/common/utils.py", line 133, in callZooFunc
    raise e
  File "/home/arda/Project/analytics-zoo/pyzoo/zoo/common/utils.py", line 127, in callZooFunc
    java_result = api(*args)
  File "/home/arda/Public/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/arda/Public/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.estimatorTrain.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 16, localhost, executor driver): Layer info: Model[a1539646]
java.lang.IllegalArgumentException: Tensor cannot be cast to Table
    at com.intel.analytics.bigdl.tensor.Tensor$class.toTable(Tensor.scala:747)
    at com.intel.analytics.bigdl.tensor.DenseTensor.toTable(DenseTensor.scala:33)
    at com.intel.analytics.bigdl.nn.Graph.getInput(Graph.scala:293)
    at com.intel.analytics.bigdl.nn.Graph.findInput(Graph.scala:301)
    at com.intel.analytics.bigdl.nn.StaticGraph.updateOutput(StaticGraph.scala:60)
    at com.intel.analytics.bigdl.nn.keras.KerasLayer.updateOutput(KerasLayer.scala:272)
    at com.intel.analytics.bigdl.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:282)
    at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply$mcI$sp(DistriOptimizer.scala:269)
    at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:260)
    at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:260)
    at com.intel.analytics.bigdl.utils.ThreadPool$$anonfun$1$$anon$5.call(ThreadPool.scala:160)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

similar to =https://github.com/intel-analytics/analytics-zoo-internal/issues/932

hkvision commented 3 years ago

The input to the Transformer should be a table I suppose. Is your input correct?

leonardozcm commented 3 years ago

Actually, when the length of input tensor does not match 1, getInput func will call toTabel, which is not support in DenseTensoror Tensor? see:

yangw1234 commented 3 years ago

It seems we do not support multi-inputs model in bigdl estimator if data is xshards.

yangw1234 commented 3 years ago

this has been fixed by intel-analytics/analytics-zoo#3439