recommenders-team / recommenders

Best Practices on Recommendation Systems
https://recommenders-team.github.io/recommenders/intro.html
MIT License
18.8k stars 3.07k forks source link

[HELP] als_deep_dive problem with Spark #1118

Closed gallogiulia closed 4 years ago

gallogiulia commented 4 years ago

Description

I am using the als_deep_dive notebook and fiddling with the different size of the Spark instance, and the iterations of the ALS algorithm. Everything works until I increase the iterations from 15 to 20, or I change the memory from 16 to 32, in this line spark = start_or_get_spark("ALS Deep Dive", memory="32g") As an example, with max iterations = 20 and memory=32gb, I get the following error after running the Evaluation cell (SparkRatingEvaluation). The error is super long, so I am just pasting the errors that don't repeat.

This is my environment:

System version: 3.6.9 |Anaconda, Inc.| (default, Jul 30 2019, 19:07:31) 
[GCC 7.3.0]
Pandas version: 1.0.4
PySpark version: 2.4.5
Python 3.6 - AzureML (environment in juypyter notebook)
Compute Instance: STANDARD_DS13_V2
evaluations = SparkRatingEvaluation(
    dfs_test, 
    dfs_pred,
    col_user=COL_USER,
    col_item=COL_ITEM,
    col_rating=COL_RATING,
    col_prediction=COL_PREDICTION
)

print(
    "RMSE score = {}".format(evaluations.rmse()),
    "MAE score = {}".format(evaluations.mae()),
    "R2 score = {}".format(evaluations.rsquared()),
    "Explained variance score = {}".format(evaluations.exp_var()),
    sep="\n"
)

--- Logging error ---
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 35068)
Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/anaconda/envs/azureml_py36/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/anaconda/envs/azureml_py36/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/anaconda/envs/azureml_py36/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/serializers.py", line 724, in read_int
    raise EOFError
EOFError
----------------------------------------
--- Logging error ---
ERROR - An error occurred while trying to connect to the Java server (127.0.0.1:35159)
Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-13-6e4ac0080a04>", line 7, in <module>
    col_prediction=COL_PREDICTION
  File "../../reco_utils/evaluation/spark_evaluation.py", line 74, in __init__
    if rating_pred.count() == 0:
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/sql/dataframe.py", line 523, in count
    return int(self._jdf.count())
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2044, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'Py4JJavaError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 1152, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/anaconda/envs/azureml_py36/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

.... etc
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/IPython/core/interactiveshell.py in run_code(self, code_obj, result, async_)
   3330                 else:
-> 3331                     exec(code_obj, self.user_global_ns, self.user_ns)
   3332             finally:

<ipython-input-13-6e4ac0080a04> in <module>
      6     col_rating=COL_RATING,
----> 7     col_prediction=COL_PREDICTION
      8 )

/mnt/batch/tasks/shared/LS_root/mounts/clusters/hrbl-ncx-compute3/code/recommenders/reco_utils/evaluation/spark_evaluation.py in __init__(self, rating_true, rating_pred, col_user, col_item, col_rating, col_prediction)
     73             raise ValueError("Empty input dataframe")
---> 74         if rating_pred.count() == 0:
     75             raise ValueError("Empty input dataframe")

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/sql/dataframe.py in count(self)
    522         """
--> 523         return int(self._jdf.count())
    524 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

<class 'str'>: (<class 'py4j.protocol.Py4JNetworkError'>, Py4JNetworkError('An error occurred while trying to connect to the Java server (127.0.0.1:35159)',))

During handling of the above exception, another exception occurred:

IndexError                                Traceback (most recent call last)
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in _get_connection(self)
    928         try:
--> 929             connection = self.deque.pop()
    930         except IndexError:

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

ConnectionRefusedError                    Traceback (most recent call last)
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in start(self)
   1066         try:
-> 1067             self.socket.connect((self.address, self.port))
   1068             self.stream = self.socket.makefile("rb")

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Py4JNetworkError                          Traceback (most recent call last)
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/IPython/core/interactiveshell.py in run_code(self, code_obj, result, async_)
   3346             if result is not None:
   3347                 result.error_in_exec = sys.exc_info()[1]
-> 3348             self.showtraceback(running_compiled_code=True)
   3349         else:
   3350             outflag = False

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/IPython/core/interactiveshell.py in showtraceback(self, exc_tuple, filename, tb_offset, exception_only, running_compiled_code)
   2047                                             value, tb, tb_offset=tb_offset)
   2048 
-> 2049                     self._showtraceback(etype, value, stb)
   2050                     if self.call_pdb:
   2051                         # drop into debugger

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/ipykernel/zmqshell.py in _showtraceback(self, etype, evalue, stb)
    544             u'traceback' : stb,
    545             u'ename' : unicode_type(etype.__name__),
--> 546             u'evalue' : py3compat.safe_unicode(evalue),
    547         }
    548 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/ipython_genutils/py3compat.py in safe_unicode(e)
     63     """
     64     try:
---> 65         return unicode_type(e)
     66     except UnicodeError:
     67         pass

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/protocol.py in __str__(self)
    466     def __str__(self):
    467         gateway_client = self.java_exception._gateway_client
--> 468         answer = gateway_client.send_command(self.exception_cmd)
    469         return_value = get_return_value(answer, gateway_client, None, None)
    470         return "{0}: {1}".format(self.errmsg, return_value)

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in send_command(self, command, retry, binary)
    981          if `binary` is `True`.
    982         """
--> 983         connection = self._get_connection()
    984         try:
    985             response = connection.send_command(command)

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in _get_connection(self)
    929             connection = self.deque.pop()
    930         except IndexError:
--> 931             connection = self._create_connection()
    932         return connection
    933 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in _create_connection(self)
    935         connection = GatewayConnection(
    936             self.gateway_parameters, self.gateway_property)
--> 937         connection.start()
    938         return connection
    939 

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in start(self)
   1077                 "server ({0}:{1})".format(self.address, self.port)
   1078             logger.exception(msg)
-> 1079             raise Py4JNetworkError(msg, e)
   1080 
   1081     def _authenticate_connection(self):

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:35159)

Can you help me figuring out how I can increase the iterations with which I can train the ALS algorithm? Thanks

gramhagen commented 4 years ago

This seems to be an issue with Spark running out of memory. There are some helpful resources on configuring Spark here: https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2/ http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/

one particular configuration parameter that can be useful to adjust is: yarn_scheduler_maximum_allocation_mb

Hope that helps, tuning Spark can be difficult, I find it useful sometime to spin up a large cluster in Azure Databricks to figure out if the issue is a memory constraint or something else.

gallogiulia commented 4 years ago

@gramhagen Thank you so much for your help with this. I will look at the links you provided