intel-analytics / ipex-llm

Accelerate local LLM inference and finetuning (LLaMA, Mistral, ChatGLM, Qwen, Baichuan, Mixtral, Gemma, Phi, MiniCPM, etc.) on Intel XPU (e.g., local PC with iGPU and NPU, discrete GPU such as Arc, Flex and Max); seamlessly integrate with llama.cpp, Ollama, HuggingFace, LangChain, LlamaIndex, GraphRAG, DeepSpeed, vLLM, FastChat, Axolotl, etc.
Apache License 2.0
6.51k stars 1.24k forks source link

Exception happened if using orca estimator train pytorch model with xshards #4965

Open dding3 opened 2 years ago

dding3 commented 2 years ago

The exception is: An error occurred while calling o59.estimatorTrain. : com.intel.analytics.bigdl.dllib.utils.InvalidOperationException at com.intel.analytics.bigdl.dllib.utils.Log4Error$.invalidOperationError(Log4Error.scala:38) at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer.train(Topology.scala:1161) at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer.train(Topology.scala:1302) at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer.train(Topology.scala:969) at com.intel.analytics.bigdl.dllib.estimator.Estimator.train(Estimator.scala:192) at com.intel.analytics.bigdl.dllib.estimator.python.PythonEstimator.estimatorTrain(PythonEstimator.scala:100) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intel.analytics.bigdl.dllib.keras.layers.utils.KerasUtils$.invokeMethod(KerasUtils.scala:317) at com.intel.analytics.bigdl.dllib.keras.layers.utils.KerasUtils$.invokeMethodWithEv(KerasUtils.scala:344) at com.intel.analytics.bigdl.dllib.keras.models.InternalOptimizerUtil$.optimizeModels(Topology.scala:886) at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer.train(Topology.scala:1089) ... 15 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 17, localhost, executor driver): com.intel.analytics.bigdl.dllib.utils.UnKnownException: com.intel.analytics.bigdl.dllib.utils.InvalidOperationException: only support tensor input at com.intel.analytics.bigdl.dllib.utils.Log4Error$.unKnowExceptionError(Log4Error.scala:60) at com.intel.analytics.bigdl.dllib.utils.ThreadPool.invokeAndWait2(ThreadPool.scala:175) at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4.apply(DistriOptimizer.scala:261) at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4.apply(DistriOptimizer.scala:220) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: com.intel.analytics.bigdl.dllib.utils.InvalidOperationException: only support tensor input at java.util.concurrent.FutureTask.report(FutureTask.java:122)

It can be reproed by below code:

# model definition
class MLP(Module):
    # define model elements
    def __init__(self, n_inputs):
        super(MLP, self).__init__()
        # input to first hidden layer
        self.hidden1 = Linear(n_inputs, 10)
        kaiming_uniform_(self.hidden1.weight, nonlinearity='relu')
        self.act1 = ReLU()
        # second hidden layer
        self.hidden2 = Linear(10, 8)
        kaiming_uniform_(self.hidden2.weight, nonlinearity='relu')
        self.act2 = ReLU()
        # third hidden layer and output
        self.hidden3 = Linear(8, 1)
        xavier_uniform_(self.hidden3.weight)
        self.act3 = Sigmoid()

    # forward propagate input
    def forward(self, X):
        # input to first hidden layer
        X = self.hidden1(X)
        X = self.act1(X)
         # second hidden layer
        X = self.hidden2(X)
        X = self.act2(X)
        # third hidden layer and output
        X = self.hidden3(X)
        X = self.act3(X)
        return X

init_orca_context(memory="4g")

path = PATH
data_shard = bigdl.orca.data.pandas.read_csv(path)

def getSchema(iter):
    for pdf in iter:
        return [pdf.columns.values]

column = data_shard.rdd.mapPartitions(getSchema).first()
from bigdl.orca.data.transform import LabelEncode
label_encoder = LabelEncode(inputCol=column[-1], outputCol="indexedLabel")
data_shard = label_encoder.fit_transform(data_shard)
data_shard.rdd.count()

# define the network
model = MLP(34)

criterion = BCELoss()
optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)

orca_estimator = Estimator.from_torch(model=model,
                                      optimizer=optimizer,
                                      loss=criterion,
                                      metrics=[Accuracy()],
                                      backend="bigdl")

orca_estimator.fit(data=data_shard, epochs=8,
                   feature_cols=list(column[:-1]), label_cols=["indexedLabel"], batch_size=4)

The data can be downloaded from Almaren-Gateway:/mnt/md0/home/ding.ding/new_ionosphere.csv

jason-dai commented 2 years ago

The bigdl backend does not support xshards? @qiuxin2012

qiuxin2012 commented 2 years ago

The bigdl backend does not support xshards? @qiuxin2012

bigdl backend only support FeatureSet, is xshards a FeatureSet?

jason-dai commented 2 years ago

@yushan111 please take a look

shanyu-sys commented 2 years ago

In Orca Estimators, we assume features_cols is multiple Inputs for model, instead of different columns that could be stacked and serve as one model input. It seems your model only accepts one input, so there might be two options

dding3 commented 2 years ago

In Orca Estimators, we assume features_cols is multiple Inputs for model, instead of different columns that could be stacked and serve as one model input. It seems your model only accepts one input, so there might be two options

  • Feed XShards of dictionary of {‘x’: feature, ‘y’: label} to estimator. You could use transform_shards to convert your data partition into a dictionary with values of one ndarray (shape of (n, 8)) of feature and one ndarray (shape of (n)) of label.
  • Feed XShards of pandas dataframe to estimator. Your pandas dataframe should only contain one column, and inside the column, each item could be a ndarray shape of (8,)

Thank you for the investigation. Is it possible before training we add logic to check and if necessary convert the multiple columns into one when trained XShards of pandas dataframe .

shanyu-sys commented 2 years ago

Thank you for the investigation. Is it possible before training we add logic to check and if necessary convert the multiple columns into one when trained XShards of pandas dataframe .

Sure. Great suggestions! It should be more convenient for users.

jason-dai commented 2 years ago

How do we know if the mode expects just one input or multiple inputs? I think specifying multiple columns in features_cols is for the latter.

shanyu-sys commented 2 years ago

@jason-dai @dding3 Maybe a more common case is stacking multiple columns into a single tensor, passing to a single input model. In this case, according to current API, users need to write a transform function, should we optimize this case?

sgwhat commented 2 years ago

@jason-dai @dding3 Maybe a more common case is stacking multiple columns into a single tensor, passing to a single input model. In this case, according to current API, users need to write a transform function, should we optimize this case?

Maybe we could move the transform logic into estimator, as it could support users to specifying multiple columns in features_cols.

jason-dai commented 2 years ago

@jason-dai @dding3 Maybe a more common case is stacking multiple columns into a single tensor, passing to a single input model. In this case, according to current API, users need to write a transform function, should we optimize this case?

How to you plan to support multi-input model then?

shanyu-sys commented 2 years ago

How to you plan to support multi-input model then?

We could stick to the current design and may add more documents to guide users to write the transform function.

If we do want to optimize the case through interface change, maybe we could use a nested list in feature_cols to support mutli-input model. e.g feature_cols = [['f1", "f2"], ["f3", "f4"]], internally we stack ['f1", "f2"] as one tensor input, ["f3", "f4"] as another tensor input. It is just a raw design idea.