intel-analytics / ipex-llm

Accelerate local LLM inference and finetuning (LLaMA, Mistral, ChatGLM, Qwen, Mixtral, Gemma, Phi, MiniCPM, Qwen-VL, MiniCPM-V, etc.) on Intel XPU (e.g., local PC with iGPU and NPU, discrete GPU such as Arc, Flex and Max); seamlessly integrate with llama.cpp, Ollama, HuggingFace, LangChain, LlamaIndex, vLLM, GraphRAG, DeepSpeed, Axolotl, etc
Apache License 2.0
6.69k stars 1.26k forks source link

Getting jep.JepException: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds] exception in starting a training from ORCA pytorch estimation with BigDL backend #5560

Open amardeepjaiman opened 2 years ago

amardeepjaiman commented 2 years ago

Hi, I am trying to run a Fashion MNIST sample code given in BigDL repo on Azure Databricks spark cluster environment. Sample code link is here : https://github.com/intel-analytics/BigDL/blob/main/python/orca/colab-notebook/examples/fashion_mnist_bigdl.ipynb

Cluster Configuration:

I have 1 Azure D4_V5 Based driver node and 2 Azure standard D4_V5 based worker nodes setup in my spark cluster. Azure Databricks Runtime : 9.1 LTS ML (Scala 2.12, Spark 3.1.2)

Spark Configuration is below : spark.executorEnv.PYTHONHOME /databricks/python3/lib/python3.8 spark.serializer org.apache.spark.serializer.JavaSerializer spark.executorEnv.KMP_BLOCKTIME 0 spark.databricks.delta.preview.enabled true spark.rpc.message.maxSize 2047 spark.executor.cores 3 spark.executor.memory 8g spark.files.fetchTimeout 100000s spark.network.timeout 100000s spark.databricks.conda.condaMagic.enabled true spark.driver.memory 8g spark.scheduler.minRegisteredResourcesRatio 1.0 spark.scheduler.maxRegisteredResourcesWaitingTime 60s spark.executor.heartbeatInterval 1000000 spark.cores.max 6 spark.default.parallelism 1000 spark.executorEnv.OMP_NUM_THREADS 1 spark.driver.cores 3

I create the estimator using orca_estimator = Estimator.from_torch(model=net, optimizer=optimizer, loss=criterion, metrics=[Accuracy()], backend="bigdl")

and geting exception in following line :

from bigdl.orca.learn.trigger import EveryEpoch orca_estimator.fit(data=trainloader, epochs=epochs, validation_data=testloader, checkpoint_trigger=EveryEpoch())

Please find below the full stacktrace of the error I am getting

jep.JepException: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds] at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:98) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.createInterpreter(PythonInterpreter.scala:82) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.init(PythonInterpreter.scala:63) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.check(PythonInterpreter.scala:56) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.exec(PythonInterpreter.scala:104) at com.intel.analytics.bigdl.orca.net.PythonFeatureSet$.$anonfun$loadPythonSet$1(PythonFeatureSet.scala:90) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:868) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:868) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) at scala.concurrent.Await$.$anonfun$result$1(package.scala:220) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57) at scala.concurrent.Await$.result(package.scala:146) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.$anonfun$threadExecute$2(PythonInterpreter.scala:91) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:90) ... 28 more

org.apache.spark.rdd.RDD.count(RDD.scala:1263) com.intel.analytics.bigdl.orca.net.PythonFeatureSet$.loadPythonSet(PythonFeatureSet.scala:86) com.intel.analytics.bigdl.orca.net.PythonFeatureSet.(PythonFeatureSet.scala:168) com.intel.analytics.bigdl.orca.net.PythonFeatureSet$.python(PythonFeatureSet.scala:61) com.intel.analytics.bigdl.orca.net.python.PythonZooNet.createFeatureSetFromPyTorch(PythonZooNet.scala:283) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) py4j.Gateway.invoke(Gateway.java:295) py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) py4j.commands.CallCommand.execute(CallCommand.java:79) py4j.GatewayConnection.run(GatewayConnection.java:251) java.lang.Thread.run(Thread.java:748)

Please let me know if someone has already faced this issue in past.

Also requesting BigDL official team to support on this issue as i want to use the BigDL library for my deep learning training on Spark cluster for distributed training.

Thanks in Adavance

yangw1234 commented 2 years ago

@qiuxin2012 could you help take a look at this issue?

hkvision commented 2 years ago

Same issue as https://github.com/intel-analytics/BigDL/issues/4800?

qiuxin2012 commented 2 years ago

@amardeepjaiman As the notebook is for colab, so you should have done a lots of changes. Could you tell us the detail steps? Including

  1. How to install the conda environment to azure?
  2. How to install related python dependencies, like torch torchvision jep bigdl
  3. How to execute the bigdl code? command line or notebook
amardeepjaiman commented 2 years ago

I am using Azure Databricks Environment to execute it in notebook. to setup environment I am using a shell script so that the required enviornment and dependency jar files are available on all the nodes. Please find the content of the shell script below :

!/bin/bash

export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64' export PYTHONPATH='/databricks/python3' /databricks/python/bin/pip install bigdl-spark3 /databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu six cloudpickle argparse tqdm matplotlib tensorboard -f https://download.pytorch.org/whl/torch_stable.html JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64' /databricks/python/bin/pip install jep

I am trying to runn the BigDL code in databricks notebook not command line. Please let me know if you need more information.

qiuxin2012 commented 2 years ago

@PatrickkZ Please try to reproduce the error, or find a right way to run the notebook.

PatrickkZ commented 2 years ago

@amardeepjaiman, hi, we have already reproduce the same error on databricks. we are finding a way to solve this problem, We'll let you know when we have more information.

xbinglzh commented 2 years ago

the same error

xbinglzh commented 2 years ago

How long will it take?

xbinglzh commented 2 years ago

Layer info: TorchModel[5d5e341e] jep.JepException: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds] at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:98) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.exec(PythonInterpreter.scala:108) at com.intel.analytics.bigdl.orca.net.TorchModel.updateOutput(TorchModel.scala:131) at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:283) at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply$mcI$sp(DistriOptimizer.scala:272) at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263) at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263) at com.intel.analytics.bigdl.dllib.utils.ThreadPool$$anonfun$1$$anon$5.call(ThreadPool.scala:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$$anonfun$5.apply(PythonInterpreter.scala:91) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$$anonfun$5.apply(PythonInterpreter.scala:90) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.intel.analytics.bigdl.orca.utils.PythonInterpreter$.threadExecute(PythonInterpreter.scala:90) ... 11 more

at com.intel.analytics.bigdl.dllib.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:289)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply$mcI$sp(DistriOptimizer.scala:272)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263)
at com.intel.analytics.bigdl.dllib.optim.DistriOptimizer$$anonfun$4$$anonfun$5$$anonfun$apply$2.apply(DistriOptimizer.scala:263)
at com.intel.analytics.bigdl.dllib.utils.ThreadPool$$anonfun$1$$anon$5.call(ThreadPool.scala:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
qiuxin2012 commented 2 years ago

@amardeepjaiman @xbinglzh I failed to enable jep backend pytorch estimator. But I run pyspark backend pytorch estimator successfully. See example https://github.com/intel-analytics/BigDL/blob/v2.0.0/python/orca/example/learn/pytorch/fashion_mnist/fashion_mnist.py You can try it with following configuration. Databricks's Init script:

#!/bin/bash
apt-get install openjdk-8-jdk-headless -qq > /dev/null
python -c "import os;os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'"
update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64'

/databricks/python/bin/pip install numpy==1.22.3

/databricks/python/bin/pip install bigdl-orca-spark3 tqdm
/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html
/databricks/python/bin/pip install cloudpickle

cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars

Databricks's Spark Conf: (spark.executor.cores, spark.cores.max should match your cluster, my is one 4 cores executor.)

spark.driver.extraLibraryPath /databricks/python3/lib/
spark.cores.max 4
spark.executor.extraLibraryPath /databricks/python3/lib/
spark.executor.cores 4

You need to delete the argments parser in the notebook and use following arguments:

    cluster_mode = "spark-submit"
    runtime = "spark"
    address=""
    backend="spark"
    batch_size=4
    epochs=2
    data_dir="./data"
    download=True
qiuxin2012 commented 2 years ago

You can use below code in your notebook directly:

from __future__ import print_function
import os
import argparse
import numpy as np
import matplotlib.pyplot as plt

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.pytorch import Estimator
from bigdl.orca.learn.metrics import Accuracy
from bigdl.orca.learn.trigger import EveryEpoch

def train_data_creator(config={}, batch_size=4, download=True, data_dir='./data'):
    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5,), (0.5,))])

    trainset = torchvision.datasets.FashionMNIST(root=data_dir,
                                                 download=download,
                                                 train=True,
                                                 transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                              shuffle=True, num_workers=0)
    return trainloader

def validation_data_creator(config={}, batch_size=4, download=True, data_dir='./data'):
    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5,), (0.5,))])
    testset = torchvision.datasets.FashionMNIST(root=data_dir, train=False,
                                                download=download, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                             shuffle=False, num_workers=0)
    return testloader

# helper function to show an image
def matplotlib_imshow(img, one_channel=False):
    if one_channel:
        img = img.mean(dim=0)
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    if one_channel:
        plt.imshow(npimg, cmap="Greys")
    else:
        plt.imshow(np.transpose(npimg, (1, 2, 0)))

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def model_creator(config):
    model = Net()
    return model

def optimizer_creator(model, config):
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    return optimizer

def main():
    cluster_mode = "spark-submit"
    runtime = "spark"
    address=""
    backend="spark"
    batch_size=4
    epochs=2
    data_dir="./data"
    download=True

    if runtime == "ray":
        init_orca_context(runtime=runtime, address=address)
    else:
        if cluster_mode == "local":
            init_orca_context()
        elif cluster_mode.startswith("yarn"):
            init_orca_context(cluster_mode=cluster_mode, cores=4, num_nodes=2)
        elif cluster_mode == "spark-submit":
            init_orca_context(cluster_mode=cluster_mode)

    tensorboard_dir = data_dir+"runs"
    # constant for classes
    classes = ('T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle Boot')

    # plot some random training images
    dataiter = iter(train_data_creator(config={}, batch_size=4,
                                       download=download, data_dir=data_dir))
    images, labels = dataiter.next()

    # create grid of images
    img_grid = torchvision.utils.make_grid(images)

    # show images
    matplotlib_imshow(img_grid, one_channel=True)

    # training loss vs. epochs
    criterion = nn.CrossEntropyLoss()
    batch_size = batch_size
    epochs = epochs
    if backend == "bigdl":
        train_loader = train_data_creator(config={}, batch_size=4,
                                          download=download, data_dir=data_dir)
        test_loader = validation_data_creator(config={}, batch_size=4,
                                              download=download, data_dir=data_dir)

        net = model_creator(config={})
        optimizer = optimizer_creator(model=net, config={"lr": 0.001})
        orca_estimator = Estimator.from_torch(model=net,
                                              optimizer=optimizer,
                                              loss=criterion,
                                              metrics=[Accuracy()],
                                              backend="bigdl")

        orca_estimator.set_tensorboard(tensorboard_dir, "bigdl")

        orca_estimator.fit(data=train_loader, epochs=epochs, validation_data=test_loader,
                           checkpoint_trigger=EveryEpoch())

        res = orca_estimator.evaluate(data=test_loader)
        print("Accuracy of the network on the test images: %s" % res)
    elif backend in ["ray", "spark"]:
        orca_estimator = Estimator.from_torch(model=model_creator,
                                              optimizer=optimizer_creator,
                                              loss=criterion,
                                              metrics=[Accuracy()],
                                              model_dir=os.getcwd(),
                                              use_tqdm=True,
                                              backend=backend)
        stats = orca_estimator.fit(train_data_creator, epochs=epochs, batch_size=batch_size)

        for stat in stats:
            writer.add_scalar("training_loss", stat['train_loss'], stat['epoch'])
        print("Train stats: {}".format(stats))
        val_stats = orca_estimator.evaluate(validation_data_creator, batch_size=batch_size)
        print("Validation stats: {}".format(val_stats))
        orca_estimator.shutdown()
    else:
        raise NotImplementedError("Only bigdl, torch_distributed, and spark are supported "
                                  "as the backend, but got {}".format(args.backend))

    stop_orca_context()

main()
amardeepjaiman commented 2 years ago

ok. let me try and get back to you.

amardeepjaiman commented 2 years ago

Hi @qiuxin2012 ,

I tried to use the init script shared by you , but I am getting init script faliure while starting the databricks cluster. Which databricks runtime version you are using ? Please check the attached error snapshot and cluster configuration.

Screenshot 2022-09-19 at 9 55 42 AM

Screenshot 2022-09-19 at 9 55 21 AM

qiuxin2012 commented 2 years ago

My is 9.1 LTS(include spark 3.1.2, Scala 2.12). See the image below.

image

amardeepjaiman commented 2 years ago

@qiuxin2012

Cluster is up with init script. When i run the given source code with Spark backend, training seems to be started but getting following error with model save directory in save_pkl function.

java.io.FileNotFoundException: /databricks/driver/state.pkl

Py4JJavaError Traceback (most recent call last)

in ----> 1 main() in main() 70 use_tqdm=True, 71 backend=backend) ---> 72 stats = orca_estimator.fit(train_data_creator, epochs=epochs, batch_size=batch_size) 73 print(stats) 74 for stat in stats: /databricks/python/lib/python3.8/site-packages/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py in fit(self, data, epochs, batch_size, profile, reduce_results, info, feature_cols, label_cols, callbacks) 260 lambda iter: transform_func(iter, init_params, params)).collect() 261 --> 262 self.state_dict = PyTorchPySparkEstimator._get_state_dict_from_remote(self.model_dir) 263 worker_stats = res 264 /databricks/python/lib/python3.8/site-packages/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py in _get_state_dict_from_remote(remote_dir) 278 try: 279 temp_dir = tempfile.mkdtemp() --> 280 get_remote_file_to_local(os.path.join(remote_dir, "state.pkl"), 281 os.path.join(temp_dir, "state.pkl"), 282 over_write=True) /databricks/python/lib/python3.8/site-packages/bigdl/dllib/utils/file_utils.py in get_remote_file_to_local(remote_path, local_path, over_write) 144 145 def get_remote_file_to_local(remote_path, local_path, over_write=False): --> 146 callZooFunc("float", "getRemoteFileToLocal", remote_path, local_path, over_write) 147 148 /databricks/python/lib/python3.8/site-packages/bigdl/dllib/utils/file_utils.py in callZooFunc(bigdl_type, name, *args) 225 if not ("does not exist" in str(e) 226 and "Method {}".format(name) in str(e)): --> 227 raise e 228 else: 229 return result /databricks/python/lib/python3.8/site-packages/bigdl/dllib/utils/file_utils.py in callZooFunc(bigdl_type, name, *args) 219 try: 220 api = getattr(jinvoker, name) --> 221 java_result = api(*args) 222 result = _java2py(gateway, java_result) 223 except Exception as e: /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 115 def deco(*a, **kw): 116 try: --> 117 return f(*a, **kw) 118 except py4j.protocol.Py4JJavaError as e: 119 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o372.getRemoteFileToLocal. : java.io.FileNotFoundException: /databricks/driver/state.pkl at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:120) at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:68) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV1.open(DatabricksFileSystemV1.scala:80) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV1.open(DatabricksFileSystemV1.scala:89) at com.databricks.backend.daemon.data.client.DatabricksFileSystem.open(DatabricksFileSystem.scala:88) at com.intel.analytics.bigdl.dllib.common.zooUtils$.getRemoteFileToLocal(zooUtils.scala:240) at com.intel.analytics.bigdl.dllib.common.PythonZoo.getRemoteFileToLocal(PythonZoo.scala:254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) It seems model_dir path we are giving in params , it is not taking it on databrick. I have also tried to give shared location path on DBFS as '/dbfs/FileStore' and 'dbfs:/FileStore' but it is also not taking it. Could you please let me know about it how did you resolve this issue ? I am seeing several reported issues regarding model_dir path on databricks for spark backend. Thanks, Amardeep
amardeepjaiman commented 2 years ago

Hi @qiuxin2012 ,

I was able to solve the above issue using the latest nightly build of bigdl-spark3 from BigDL repos. Now the training is running with above configuration where I have 1 min worker (with4 cores) assigned and training is running on single worker node. Now if I change the Min Worker in the databricks cluster configuration to 2. Now i have 2 worker nodes with 4 cores. So i changed the spark configuration spark.cores.max 4 to spark.cores.max 8. Ideally now training should be distributedly running on both worker nodes and using all 8 cores. But I get an exception while running this. Please find below the stacktrace below.

org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(5, 1) finished unsuccessfully.

Py4JJavaError Traceback (most recent call last)

in 7 checkpoint_trigger=EveryEpoch()) 8 elif backend in ["ray", "spark"]: ----> 9 stats = orca_estimator.fit(train_data_creator, epochs=epochs, batch_size=batch_size) 10 print(stats) 11 #for stat in stats: /local_disk0/spark-535f5fcb-a5b3-4f65-978d-b2904ffacebc/userFiles-3b1eb76a-8e9c-40ee-85cc-cd7442a8bd6b/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py in fit(self, data, epochs, batch_size, profile, reduce_results, info, feature_cols, label_cols, validation_data, callbacks) 294 return PytorchPysparkWorker(**init_param).train_epochs(**param) 295 --> 296 res = self.workerRDD.barrier().mapPartitions( 297 lambda iter: transform_func(iter, init_params, params)).collect() 298 /databricks/spark/python/pyspark/rdd.py in collect(self) 965 # Default path used in OSS Spark / for non-credential passthrough clusters: 966 with SCCallSiteSync(self.context) as css: --> 967 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 968 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 969 /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 115 def deco(*a, **kw): 116 try: --> 117 return f(*a, **kw) 118 except py4j.protocol.Py4JJavaError as e: 119 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(5, 1) finished unsuccessfully. org.apache.spark.api.python.PythonException: 'RuntimeError: [../third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [127.0.1.1]:46496: Connection refused'. Full traceback below: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 754, in main process() File "/databricks/spark/python/pyspark/worker.py", line 744, in process out_iter = func(split_index, iterator) File "/databricks/spark/python/pyspark/rdd.py", line 2900, in func return f(iterator) File "/local_disk0/spark-535f5fcb-a5b3-4f65-978d-b2904ffacebc/userFiles-3b1eb76a-8e9c-40ee-85cc-cd7442a8bd6b/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py", line 297, in File "/local_disk0/spark-535f5fcb-a5b3-4f65-978d-b2904ffacebc/userFiles-3b1eb76a-8e9c-40ee-85cc-cd7442a8bd6b/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_estimator.py", line 294, in transform_func File "/local_disk0/spark-1f5a35e9-7687-40cd-a547-b12c6fb12535/executor-229da1dd-799c-4cbc-8f41-67678a47db52/spark-c9524750-d4e8-4c08-b4d7-79163cd29f60/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_worker.py", line 95, in __init__ self.setup_distributed(self.mode, cluster_info, driver_ip, driver_tcp_store_port) File "/local_disk0/spark-1f5a35e9-7687-40cd-a547-b12c6fb12535/executor-229da1dd-799c-4cbc-8f41-67678a47db52/spark-c9524750-d4e8-4c08-b4d7-79163cd29f60/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/pytorch_pyspark_worker.py", line 117, in setup_distributed self.setup_torch_distribute(tcp_store_host=driver_ip, File "/local_disk0/spark-1f5a35e9-7687-40cd-a547-b12c6fb12535/executor-229da1dd-799c-4cbc-8f41-67678a47db52/spark-c9524750-d4e8-4c08-b4d7-79163cd29f60/addedFile9081478138653500889bigdl_spark_3_1_2_2_1_0_SNAPSHOT_python_api-57d27.egg/bigdl/orca/learn/pytorch/torch_runner.py", line 174, in setup_torch_distribute dist.init_process_group( File "/databricks/python/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 602, in init_process_group default_pg = _new_process_group_helper( File "/databricks/python/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 703, in _new_process_group_helper pg = ProcessGroupGloo(prefix_store, rank, world_size, timeout=timeout) RuntimeError: [../third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [127.0.1.1]:46496: Connection refused at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:830) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:812) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:595) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1038) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:819) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:822) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:678) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2873) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2820) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2814) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2814) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2571) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3078) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3022) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3010) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1112) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2494) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1036) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:419) at org.apache.spark.rdd.RDD.collect(RDD.scala:1034) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748)
qiuxin2012 commented 2 years ago

@amardeepjaiman Sorry for the late response. We have reproduced your new error, I will info you when we find a solution. The error will happen when number of executors >= 2.

PatrickkZ commented 2 years ago

@amardeepjaiman, hi, you can fix this by add a environment variable GLOO_SOCKET_IFNAME.

  1. execute !ifconfig in the notebook image

  2. set GLOO_SOCKET_IFNAME to your first Ethernet interface(mine is eth0) image

this works for me when I have 2 workers.

here is my init script

# use the latest version of orca
/databricks/python/bin/pip install --pre --upgrade bigdl-orca-spark3
/databricks/python/bin/pip install tqdm
/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html
/databricks/python/bin/pip install cloudpickle

cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars

As for the model_dir problem, you can just leave it to None

elif backend in ["ray", "spark"]:
      orca_estimator = Estimator.from_torch(model=model_creator,
                                          optimizer=optimizer_creator,
                                          loss=criterion,
                                          metrics=[Accuracy()],
                                          model_dir=None,
                                          use_tqdm=True,
                                          backend=backend)

if model_dir is not None, it should be a path starts with /dbfs or dbfs:, but this won't work until this pr is merged. so you can try it later, for now, just leave it to None.

reference