danielenricocahall / elephas

Distributed Deep learning with Keras & Spark
MIT License
17 stars 5 forks source link

Call the fit method several times #22

Open matpaolacci opened 1 year ago

matpaolacci commented 1 year ago

Overview I need to call the fit method several times.

Proposal Hi, I have a question about the training step: I have a huge dataset of images and I'm not able to load all of these into spark dataframe, I had to load a bunch of images at time in order to create a small spark dataframe at time and feed it to the NN. Therefore, I need to call the fit method several times. In order to reach this goal I created a keras model and at each iteration I created a spark model by always passing the same keras model.

I would like to know if this way is corrected or there are other better ways to do this.

danielenricocahall commented 1 year ago

Hello @Mett92 , thank you for reaching out! How are you currently loading the images? Are you using OpenCV or another API then converting to an rdd? You may be able to read the images directly into Spark, which should handle any data not fitting into memory by letting it spillover to disk - do you have a small example module?

matpaolacci commented 1 year ago

Hi Daniel, in the first place thanks for your reply. Currently, I load the images by using the spark api spark.read.format("image").load("/path/to/images") and if I load all images all at once then when I train the model I get an exception on RDD.collectAndServe. I don't understand what cause this error, considering that all the images take about 10gb of memory and a single image can take about 7-15mb but I transform these from bytes (read from spark) to list and store it into a column.

My setting is:

danielenricocahall commented 1 year ago

Hm okay - could I see the full stack trace?

matpaolacci commented 1 year ago

By running in local mode with 12GB of RAM and 7CPUs I got the exception below, bear in mind that the dataframe with the images in padas takes 7MB, I converted it in spark rdd with to_simple_rdd function and fed it to SparkModel.


*Serving Flask app 'elephas.parameter.server'
*Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
*Running on http://172.17.0.2:4000
Press CTRL+C to quit
>>> Initialize workers
>>> Distribute load
23/02/17 11:18:36 WARN TaskSetManager: Stage 9 contains a task of very large size (10351 KiB). The maximum recommended task size is 1000 KiB.
Output exceeds the size limit. Open the full output data in a text editor
2023-02-17 11:18:37.546826: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-17 11:18:37.547251: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-17 11:18:37.562140: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-17 11:18:37.562499: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-17 11:18:37.567465: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-17 11:18:37.567610: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-17 11:18:37.568507: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-17 11:18:37.568625: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-17 11:18:37.570528: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-17 11:18:37.570664: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-17 11:18:37.583343: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-17 11:18:37.583492: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-17 11:18:37.595169: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-02-17 11:18:37.595302: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-02-17 11:18:44.553532: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-02-17 11:18:44.555737: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-17 11:18:44.557735: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (97944c82b1c8): /proc/driver/nvidia/version does not exist
2023-02-17 11:18:44.558705: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-17 11:18:44.686706: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-02-17 11:18:44.686781: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-17 11:18:44.686814: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (97944c82b1c8): /proc/driver/nvidia/version does not exist
2023-02-17 11:18:44.688294: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-17 11:18:44.701719: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-02-17 11:18:44.701815: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-17 11:18:44.701850: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (97944c82b1c8): /proc/driver/nvidia/version does not exist
2023-02-17 11:18:44.704179: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-17 11:18:44.730965: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-02-17 11:18:44.731070: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-17 11:18:44.731151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (97944c82b1c8): /proc/driver/nvidia/version does not exist
2023-02-17 11:18:44.731519: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-17 11:18:44.749229: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-02-17 11:18:44.749377: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-17 11:18:44.749429: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (97944c82b1c8): /proc/driver/nvidia/version does not exist
2023-02-17 11:18:44.749806: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-17 11:18:44.986324: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-02-17 11:18:44.986434: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-17 11:18:44.986477: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (97944c82b1c8): /proc/driver/nvidia/version does not exist
2023-02-17 11:18:44.990043: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-02-17 11:18:45.041096: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-02-17 11:18:45.041174: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2023-02-17 11:18:45.041208: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (97944c82b1c8): /proc/driver/nvidia/version does not exist
2023-02-17 11:18:45.041621: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
172.17.0.2 - - [17/Feb/2023 11:18:45] "GET /parameters HTTP/1.1" 200 -
172.17.0.2 - - [17/Feb/2023 11:18:45] "GET /parameters HTTP/1.1" 200 -
172.17.0.2 - - [17/Feb/2023 11:18:46] "GET /parameters HTTP/1.1" 200 -
172.17.0.2 - - [17/Feb/2023 11:18:46] "GET /parameters HTTP/1.1" 200 -
172.17.0.2 - - [17/Feb/2023 11:18:46] "GET /parameters HTTP/1.1" 200 -
172.17.0.2 - - [17/Feb/2023 11:18:46] "GET /parameters HTTP/1.1" 200 -
2023-02-17 11:18:46.296455: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 14942208 exceeds 10% of free system memory.
2023-02-17 11:18:46.332863: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 14942208 exceeds 10% of free system memory.
172.17.0.2 - - [17/Feb/2023 11:18:46] "GET /parameters HTTP/1.1" 200 -
2023-02-17 11:18:46.552715: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 14942208 exceeds 10% of free system memory.
2023-02-17 11:18:48.386972: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 33554432 exceeds 10% of free system memory.
2023-02-17 11:18:48.391088: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 33554432 exceeds 10% of free system memory.
2023-02-17 11:18:48.398522: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 33554432 exceeds 10% of free system memory.
2023-02-17 11:18:48.411259: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 33554432 exceeds 10% of free system memory.
2023-02-17 11:18:48.386799: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 33554432 exceeds 10% of free system memory.
2023-02-17 11:18:48.430858: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 33554432 exceeds 10% of free system memory.
2023-02-17 11:18:48.495766: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 33554432 exceeds 10% of free system memory.
2023-02-17 11:18:48.950307: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 65536000 exceeds 10% of free system memory.
2023-02-17 11:18:48.952450: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 65536000 exceeds 10% of free system memory.
2023-02-17 11:18:48.981427: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 65536000 exceeds 10% of free system memory.
2023-02-17 11:18:49.027225: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 65536000 exceeds 10% of free system memory.
2023-02-17 11:18:49.080095: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 65536000 exceeds 10% of free system memory.
2023-02-17 11:18:49.101784: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 65536000 exceeds 10% of free system memory.
2023-02-17 11:18:49.136378: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 65536000 exceeds 10% of free system memory.
2023-02-17 11:18:49.318652: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:49.400119: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:49.405648: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:49.462517: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:49.474879: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:49.575984: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:50.543721: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:51.050337: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 32768000 exceeds 10% of free system memory.
2023-02-17 11:18:51.051050: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:51.107407: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 32768000 exceeds 10% of free system memory.
2023-02-17 11:18:51.253463: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 32768000 exceeds 10% of free system memory.
2023-02-17 11:18:51.254138: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:51.273111: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 32768000 exceeds 10% of free system memory.
2023-02-17 11:18:51.575825: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 32768000 exceeds 10% of free system memory.
2023-02-17 11:18:51.606526: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:51.656142: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
2023-02-17 11:18:51.729061: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 32768000 exceeds 10% of free system memory.
2023-02-17 11:18:51.808355: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 26214400 exceeds 10% of free system memory.
1/3 [=========>....................] - ETA: 16s - loss: 9284.5703 - mean_absolute_error: 0.21210----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 60638)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/elephas/spark_model.py:75: SyntaxWarning: "is not" with a literal. Did you mean "!="?
  if self.mode is not 'synchronous':
/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/elephas/spark_model.py:75: SyntaxWarning: "is not" with a literal. Did you mean "!="?
  if self.mode is not 'synchronous':
/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/elephas/spark_model.py:75: SyntaxWarning: "is not" with a literal. Did you mean "!="?
...
/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/elephas/spark_model.py:75: SyntaxWarning: "is not" with a literal. Did you mean "!="?
  if self.mode is not 'synchronous':
/home/mattia/.conda/envs/env_test_spark/lib/python3.8/site-packages/elephas/spark_model.py:75: SyntaxWarning: "is not" with a literal. Did you mean "!="?
  if self.mode is not 'synchronous':
---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/rdd.py:1197, in RDD.collect(self)
   1196     assert self.ctx._jvm is not None
-> 1197     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   1198 return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    189 try:
--> 190     return f(*a, **kw)
    191 except Py4JJavaError as e:

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/protocol.py:334, in get_return_value(answer, gateway_client, target_id, name)
    333     else:
--> 334         raise Py4JError(
    335             "An error occurred while calling {0}{1}{2}".
    336             format(target_id, ".", name))
    337 else:

Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

ConnectionRefusedError                    Traceback (most recent call last)
Cell In[39], line 33
     30         print("> Train starts")
     31         train_model(model, X_Y_train_df, BATCH_SIZE, EPOCHS)
---> 33 main()

Cell In[39], line 31, in main()
     29 print("> Patch Successfully created!")
     30 print("> Train starts")
---> 31 train_model(model, X_Y_train_df, BATCH_SIZE, EPOCHS)

Cell In[36], line 5, in train_model(model, X_Y_train_df, batch_size, epochs)
      2 batch_size = 16 
      3 epochs = 1
----> 5 model.fit(
      6     X_Y_train_df, 
      7     epochs=epochs, 
      8     batch_size = batch_size,
      9     verbose=1, 
     10     validation_split=0.1,
     11 )

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/elephas/spark_model.py:186, in SparkModel.fit(self, rdd, **kwargs)
    183     rdd = rdd.repartition(self.num_workers)
    185 if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 186     self._fit(rdd, **kwargs)
    187 else:
    188     raise ValueError(
    189         "Choose from one of the modes: asynchronous, synchronous or hogwild")

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/elephas/spark_model.py:214, in SparkModel._fit(self, rdd, **kwargs)
    211 worker = AsynchronousSparkWorker(
    212     model_json, parameters, self.client, train_config, freq, optimizer, loss, metrics, custom)
    213 print('>>> Distribute load')
--> 214 rdd.mapPartitions(worker.train).collect()
    215 print('>>> Async training complete.')
    216 new_parameters = self.client.get_parameters()

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/rdd.py:1197, in RDD.collect(self)
   1195 with SCCallSiteSync(self.context):
   1196     assert self.ctx._jvm is not None
-> 1197     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   1198 return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/pyspark/traceback_utils.py:81, in SCCallSiteSync.__exit__(self, type, value, tb)
     79 SCCallSiteSync._spark_stack_depth -= 1
     80 if SCCallSiteSync._spark_stack_depth == 0:
---> 81     self._context._jsc.setCallSite(None)

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/java_gateway.py:1320, in JavaMember.__call__(self, *args)
   1313 args_command, temp_args = self._build_args(*args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
-> 1320 answer = self.gateway_client.send_command(command)
   1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/.conda/envs/env_test_spark/lib/python3.8/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
danielenricocahall commented 1 year ago

To verify, could you possibly run the pipeline with a subset of the data?