intel-analytics / ipex-llm

Accelerate local LLM inference and finetuning (LLaMA, Mistral, ChatGLM, Qwen, Baichuan, Mixtral, Gemma, Phi, MiniCPM, etc.) on Intel XPU (e.g., local PC with iGPU and NPU, discrete GPU such as Arc, Flex and Max); seamlessly integrate with llama.cpp, Ollama, HuggingFace, LangChain, LlamaIndex, GraphRAG, DeepSpeed, vLLM, FastChat, Axolotl, etc.
Apache License 2.0
6.51k stars 1.24k forks source link

Orca tf1 train gets No route to host error when launching from remote on k8s #3725

Open hkvision opened 2 years ago

hkvision commented 2 years ago

Run on Almaren-Node-188 cd kai conda activate orca-demo export PYSPARK_PYTHON=/usr/local/envs/pytf1/bin/python python ncf_orca.py

get the error below when the training is finished:

2021-12-21 09:47:30 INFO  DistriOptimizer$:431 - [Epoch 2 80000/80668][Iteration 21][Wall Clock 10.99408137s] Trained 8000.0 records in 0.341889744 seconds. Throughput is 23399.357 records/second. Loss is 1.6157264.
2021-12-21 09:47:30 INFO  DistriOptimizer$:431 - [Epoch 2 88000/80668][Iteration 22][Wall Clock 11.309833195s] Trained 8000.0 records in 0.315751825 seconds. Throughput is 25336.354 records/second. Loss is 1.2710276.
2021-12-21 09:47:30 INFO  DistriOptimizer$:473 - [Epoch 2 88000/80668][Iteration 22][Wall Clock 11.309833195s] Epoch finished. Wall clock time is 11328.070709 ms
2021-12-14 14:04:46 ERROR RetryingBlockFetcher:155 - Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to /172.30.96.12:34942
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:287)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:123)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:153)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:133)
        at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:143)
        at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
        at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1061)
        at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1005)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1005)
        at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1143)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(TaskResultGetter.scala:88)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:63)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: No route to host: /A.B.C.D:X 
Caused by: java.net.NoRouteToHostException: No route to host
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

py4j.protocol.Py4JJavaError: An error occurred while calling o88.estimatorTrainMiniBatch.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 851.0 failed 4 times, most recent failure: Lost task 0.3 in stage 851.0 (TID 438) (172.30.96.12 executor 1): TaskResultLost (result lost from block manager)
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
        at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
        at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1463)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.first(RDD.scala:1463)
        at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer$.$anonfun$getModel$4(Topology.scala:1471)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
        at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer$.getModel(Topology.scala:1470)
        at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer.train(Topology.scala:987)
        at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer.train(Topology.scala:1123)
        at com.intel.analytics.bigdl.dllib.keras.models.InternalDistriOptimizer.train(Topology.scala:793)
        at com.intel.analytics.bigdl.dllib.estimator.Estimator.train(Estimator.scala:191)
        at com.intel.analytics.bigdl.dllib.estimator.python.PythonEstimator.estimatorTrainMiniBatch(PythonEstimator.scala:119)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

Seems 172.30.96.12 is an internal ip. Previously there is no error, not sure what has happened during the past week for the docker image and the k8s cluster. cc @pinggao18

hkvision commented 2 years ago

Using spark-submit also gets the same error:

bin/spark-submit \
--master k8s://https://A.B.C.D:X \
--deploy-mode client \
--conf spark.driver.host=172.16.0.170 \
--conf spark.driver.port=54321 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=10.239.45.10/arda/intelanalytics/bigdl-k8s-spark-3.1.2:0.14.0-SNAPSHOT \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfsvolumeclaim.options.claimName=nfsvolumeclaim \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfsvolumeclaim.mount.path=/bigdl2.0/data \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.nfsvolumeclaim.options.claimName=nfsvolumeclaim \
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.nfsvolumeclaim.mount.path=/bigdl2.0/data \
--conf spark.kubernetes.executor.deleteOnTermination=false \
--conf spark.kubernetes.driverEnv.http_proxy=http://A.B.C.D:X  \
--conf spark.kubernetes.driverEnv.https_proxy=http://A.B.C.D:X  \
--conf spark.kubernetes.executorEnv.http_proxy=http://A.B.C.D:X  \
--conf spark.kubernetes.executorEnv.https_proxy=http://A.B.C.D:X  \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.pyspark.driver.python=/root/anaconda3/envs/orca-demo/bin/python \
--conf spark.pyspark.python=/usr/local/envs/pytf1/bin/python \
--conf spark.executorEnv.PYTHONHOME=/usr/local/envs/pytf1 \
--executor-cores 4 \
--executor-memory 50g \
--total-executor-cores 16 \
--driver-cores 4 \
--driver-memory 50g \
--properties-file /opt/bigdl-0.14.0-SNAPSHOT/conf/spark-bigdl.conf \
--conf spark.driver.extraJavaOptions=-Dderby.stream.error.file=/tmp \
--conf spark.sql.catalogImplementation='in-memory' \
--conf spark.driver.extraClassPath=local:///opt/bigdl-0.14.0-SNAPSHOT/jars/* \
--conf spark.executor.extraClassPath=local:///opt/bigdl-0.14.0-SNAPSHOT/jars/* \
/root/kai/ncf_orca.py

Rolling back the docker image to 1122 also fails. Tried on 188 and 170 and both gets the same error.

hkvision commented 2 years ago

basic_text_classification can run successfully. After further test, the issue occurs when the embedding is (relatively) large; for the basic text classification example, vocab size is 10000, if I change to 100000, it fails... Enlarging memory doesn't resolve this issue. The error raises from https://github.com/intel-analytics/BigDL/blob/0495db13f1ab6ab38e2cc650ea4f2617bae8b688/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/keras/models/Topology.scala#L1471 when getModel after the training:

      (0 until extraParamLength).foreach(i =>
        extraState(i) = models.map(_.localModels.head.getExtraParameter()(i)).first()
      )

Seems the block manager get killed when trying to fetch the extra parameters? The only task error I can find from the webUI is TaskResultLost (result lost from block manager) Any ideas @qiuxin2012 @yangw1234 @jason-dai

hkvision commented 2 years ago

Running in cluster node from remote is successful:

bin/spark-submit --master k8s://https://172.16.0.200:6443 --deploy-mode cluster --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --name bigdl2-basic_text_classification --conf spark.kubernetes.container.image=10.239.45.10/arda/intelanalytics/bigdl-k8s-spark-3.1.2:0.14.0-SNAPSHOT --conf spark.executor.instances=1 --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfsvolumeclaim.options.claimName=nfsvolumeclaim --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.nfsvolumeclaim.mount.path=/bigdl2.0/data --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.nfsvolumeclaim.options.claimName=nfsvolumeclaim --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.nfsvolumeclaim.mount.path=/bigdl2.0/data --conf spark.kubernetes.driver.label.az=true --conf spark.kubernetes.executor.label.az=true --conf spark.kubernetes.node.selector.spark=true --conf spark.kubernetes.driverEnv.http_proxy=http://child-prc.intel.com:913 --conf spark.kubernetes.driverEnv.https_proxy=http://child-prc.intel.com:913 --conf spark.kubernetes.executorEnv.http_proxy=http://child-prc.intel.com:913 --conf spark.kubernetes.executorEnv.https_proxy=http://child-prc.intel.com:913 --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.pyspark.driver.python=/usr/local/envs/pytf1/bin/python --conf spark.pyspark.python=/usr/local/envs/pytf1/bin/python --conf spark.executorEnv.PYTHONHOME=/usr/local/envs/pytf1 --executor-cores 4 --executor-memory 50g --total-executor-cores 16 --driver-cores 4 --driver-memory 50g --properties-file /opt/bigdl-0.14.0-SNAPSHOT/conf/spark-bigdl.conf --py-files local:///opt/bigdl-0.14.0-SNAPSHOT/python/bigdl-spark_3.1.2-0.14.0-SNAPSHOT-python-api.zip,local:///bigdl2.0/data/kai/ncf_orca.py --conf spark.driver.extraJavaOptions=-Dderby.stream.error.file=/tmp --conf spark.sql.catalogImplementation='in-memory' --conf spark.driver.extraClassPath=local:///opt/bigdl-0.14.0-SNAPSHOT/jars/* --conf spark.executor.extraClassPath=local:///opt/bigdl-0.14.0-SNAPSHOT/jars/* local:///bigdl2.0/data/kai/ncf_orca.py
jason-dai commented 2 years ago

1) Looks to me like a memory issue; which memory did you increase? 2) Can you try running client mode on the master mode? 3) Is there is way to monitor the status of each executor node and the driver node? 4) We probably shouldn't use first:

   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
hkvision commented 2 years ago
  1. Looks to me like a memory issue; which memory did you increase?
  2. Can you try running client mode on the master mode?
  3. Is there is way to monitor the status of each executor node and the driver node?
  4. We probably shouldn't use first:
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
  1. I increased both the driver memory and executor memory to 50G and 120G but still doesn't work. Doubt whether it is related to memory... probably the memory issue may not be the root cause since embedding of this size isn't that large...
  2. I tried on Almaren-200 and it works. It also works on cluster mode from remote.
  3. Need some investigation.
  4. I will check the code. Also at the same time @pinggao187 is setting up a new cluster for test.
jason-dai commented 2 years ago
  1. Looks to me like a memory issue; which memory did you increase?
  2. Can you try running client mode on the master mode?
  3. Is there is way to monitor the status of each executor node and the driver node?
  4. We probably shouldn't use first:
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
  1. I increased both the driver memory and executor memory to 50G and 120G but still doesn't work. Doubt whether it is related to memory... probably the memory issue may not be the root cause since embedding of this size isn't that large...

Is it related to K8s memory config for the containers?

  1. I tried on Almaren-200 and it works. It also works on cluster mode from remote.

In both cases, the drive is running in the K8s cluster, yes?

  1. Need some investigation.
  2. I will check the code. Also at the same time @pinggao187 is setting up a new cluster for test.
hkvision commented 2 years ago

Is it related to K8s memory config for the containers?

No memory constraint from the cluster according to @pinggao187

In both cases, the drive is running in the K8s cluster, yes?

Yes... So I doubt whether the issue is related to the internal ip (same as RayOnSpark), but don't know why basic text classification is successful and also I succeeded previously...

hkvision commented 2 years ago
  1. Looks to me like a memory issue; which memory did you increase?
  2. Can you try running client mode on the master mode?
  3. Is there is way to monitor the status of each executor node and the driver node?
  4. We probably shouldn't use first:
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.

For 4, checked the code, optimizer.optimize will return the trained model, not only first is called for extra parameters, but also reduce is called for weights and gradients. Tried dllib keras also gets this error.

jason-dai commented 2 years ago

Yes... So I doubt whether the issue is related to the internal ip (same as RayOnSpark), but don't know why basic text classification is successful and also I succeeded previously...

See https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L165

If the result of a Task is small, it will be directly sent back to the driver; otherwise, it will be stored in BlockManager and the driver need to fetch it from BlockManager. Therefore the behavior will be different when embedding size becomes larger.

hkvision commented 2 years ago

Yes... So I doubt whether the issue is related to the internal ip (same as RayOnSpark), but don't know why basic text classification is successful and also I succeeded previously...

See https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L165

If the result of a Task is small, it will be directly sent back to the driver; otherwise, it will be stored in BlockManager and the driver need to fetch it from BlockManager. Therefore the behavior will be different when embedding size becomes larger.

After setting "spark.task.maxDirectResultSize": "100000000" in conf, can run on the small dataset with ~20w items. But I think this is just a workaround? The main issue is why the data in the BlockManager gets lost unexpectedly?

hkvision commented 2 years ago

@jason-dai You are right. Now the root cause is the driver can't connect to the block manager. Previously I succeeded on ml-1m dataset, which only has 3000+ items... while ml-latest-small dataset has far fewer records but actually has far more items... Sorry that I overlooked this... I tried collecting some data to the driver and it gets the same error as well. Looking into how to solve this issue.

hkvision commented 2 years ago

This issue should be the same as #3605