intel-analytics / analytics-zoo

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray
https://analytics-zoo.readthedocs.io/
Apache License 2.0
17 stars 3 forks source link

Some random errors when running TF2Estimator on recsys full dataset #172

Closed hkvision closed 3 years ago

hkvision commented 3 years ago

When converting SparkXShards to RayXShards:

(raylet, ip=172.16.0.113) [2021-07-22 11:06:06,984 C 134880 134880] service_based_gcs_client.cc:235: Couldn't reconnect to GCS server. The last attempted
[Stage 504:==========================================>       (849 + 158) / 1007]2021-07-22 11:07:33 ERROR DAGScheduler:91 - Failed to update accumulator 0
 (org.apache.spark.api.python.PythonAccumulatorV2) for task 717
java.net.SocketException: Connection reset

2021-07-22 11:07:33 ERROR DAGScheduler:91 - Failed to update accumulator 0 (org.apache.spark.api.python.PythonAccumulatorV2) for task 885
java.net.SocketException: Broken pipe (Write failed)
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:650)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1248)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1248)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1338)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2107)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

http://172.16.0.107:7777/history/application_1626654036089_0216/jobs/ http://172.16.0.107:7777/history/application_1626654036089_0518/jobs/ http://172.16.0.107:7777/history/application_1626654036089_0563/jobs/

hkvision commented 3 years ago
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,926 E 71943 71943] logging.cc:415:     @     0x55cba1f5f35d ray::gcs::ServiceBasedGcsClient::GcsServiceFail
ureDetected()
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,927 E 71943 71943] logging.cc:415:     @     0x55cba1f684df _ZNSt17_Function_handlerIFvRKN3ray6StatusERKNS0
_3rpc24ReportResourceUsageReplyEEZNS4_12GcsRpcClient19ReportResourceUsageERKNS4_26ReportResourceUsageRequestERKSt8functionIS8_EEUlS3_S7_E_E9_M_invokeERKSt
9_Any_dataS3_S7_
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,928 E 71943 71943] logging.cc:415:     @     0x55cba1f6c22f ray::rpc::ClientCallImpl<>::OnReplyReceived()
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,929 E 71943 71943] logging.cc:415:     @     0x55cba1e5c772 _ZN5boost4asio6detail18completion_handlerIZN3ra
y3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,931 E 71943 71943] logging.cc:415:     @     0x55cba256e301 boost::asio::detail::scheduler::do_run_one()
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,934 E 71943 71943] logging.cc:415:     @     0x55cba256f9a9 boost::asio::detail::scheduler::run()
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,934 E 71943 71943] logging.cc:415:     @     0x55cba2571e97 boost::asio::io_context::run()
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,935 E 71943 71943] logging.cc:415:     @     0x55cba1db9ce2 main
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,936 E 71943 71943] logging.cc:415:     @     0x7fc0366f73d5 __libc_start_main
(raylet, ip=172.16.0.135) [2021-07-22 21:49:53,437 E 84791 84791] logging.cc:415:     @     0x562fbdc5fda5 (unknown)
(raylet, ip=172.16.0.102) [2021-07-22 21:44:45,679 E 159368 159368] logging.cc:415:     @     0x560eacb04da5 (unknown)
(raylet, ip=172.16.0.141) [2021-07-22 21:45:27,938 E 71943 71943] logging.cc:415:     @     0x55cba1dceda5 (unknown)
(raylet, ip=172.16.0.151) [2021-07-22 21:45:17,890 C 157490 157490] service_based_gcs_client.cc:235: Couldn't reconnect to GCS server. The last attempted
GCS server address was 172.16.0.151:43195

(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,771 C 161884 161884] service_based_gcs_client.cc:235: Couldn't reconnect to GCS server. The last attempted
GCS server address was 172.16.0.151:43195
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,771 E 161884 161884] logging.cc:415: *** Aborted at 1626961514 (unix time) try "date -d @1626961514" if you
 are using GNU date ***
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,772 E 161884 161884] logging.cc:415: PC: @                0x0 (unknown)
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,772 E 161884 161884] logging.cc:415: *** SIGABRT (@0x3db0002785c) received by PID 161884 (TID 0x7fa71e13880
0) from PID 161884; stack trace: ***
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,772 E 161884 161884] logging.cc:415:     @     0x55ee8df5f9ef google::(anonymous namespace)::FailureSignalH
andler()
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,773 E 161884 161884] logging.cc:415:     @     0x7fa71db385d0 (unknown)
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,774 E 161884 161884] logging.cc:415:     @     0x7fa71d288207 __GI_raise
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,775 E 161884 161884] logging.cc:415:     @     0x7fa71d2898f8 __GI_abort
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,776 E 161884 161884] logging.cc:415:     @     0x55ee8dafde9c _ZN3ray6RayLogD2Ev.cold
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,777 E 161884 161884] logging.cc:415:     @     0x55ee8dcbf260 ray::gcs::ServiceBasedGcsClient::ReconnectGcs
Server()
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,778 E 161884 161884] logging.cc:415:     @     0x55ee8dcbf35d ray::gcs::ServiceBasedGcsClient::GcsServiceFa
ilureDetected()
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,779 E 161884 161884] logging.cc:415:     @     0x55ee8dcc84df _ZNSt17_Function_handlerIFvRKN3ray6StatusERKN
S0_3rpc24ReportResourceUsageReplyEEZNS4_12GcsRpcClient19ReportResourceUsageERKNS4_26ReportResourceUsageRequestERKSt8functionIS8_EEUlS3_S7_E_E9_M_invokeERK
St9_Any_dataS3_S7_
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,780 E 161884 161884] logging.cc:415:     @     0x55ee8dccc22f ray::rpc::ClientCallImpl<>::OnReplyReceived()
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,781 E 161884 161884] logging.cc:415:     @     0x55ee8dbbc772 _ZN5boost4asio6detail18completion_handlerIZN3
ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E11do_completeEPvPNS1_19scheduler_operationERKNS_6system10error_codeEm
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,783 E 161884 161884] logging.cc:415:     @     0x55ee8e2ce301 boost::asio::detail::scheduler::do_run_one()
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,786 E 161884 161884] logging.cc:415:     @     0x55ee8e2cf9a9 boost::asio::detail::scheduler::run()
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,786 E 161884 161884] logging.cc:415:     @     0x55ee8e2d1e97 boost::asio::io_context::run()
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,787 E 161884 161884] logging.cc:415:     @     0x55ee8db19ce2 main
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,788 E 161884 161884] logging.cc:415:     @     0x7fa71d2743d5 __libc_start_main
(raylet, ip=172.16.0.149) [2021-07-22 21:45:14,790 E 161884 161884] logging.cc:415:     @     0x55ee8db2eda5 (unknown)
(raylet, ip=172.16.0.159) [2021-07-22 21:44:45,130 C 142559 142559] service_based_gcs_client.cc:235: Couldn't reconnect to GCS server. The last attempted
GCS server address was 172.16.0.151:43195
(raylet, ip=172.16.0.159) [2021-07-22 21:44:45,130 E 142559 142559] logging.cc:415: *** Aborted at 1626961485 (unix time) try "date -d @1626961485" if you
[Stage 548:>                                                   (0 + 441) / 1007]2021-07-22 21:51:31 ERROR DAGScheduler:91 - Failed to update accumulator 0
 (org.apache.spark.api.python.PythonAccumulatorV2) for task 210
java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.net.SocketInputStream.read(SocketInputStream.java:224)
        at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:652)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1248)
hkvision commented 3 years ago

Script here: https://github.com/analytics-zoo/friesian/pull/99 The error occurs occasionally. According to @jenniew she also can't run on large data. Better to investigate to the problem. I already give ~1T memory for 200G data, why it still may fail?

shanyu-sys commented 3 years ago

Sure, I will look into the issue.

hkvision commented 3 years ago

Maybe if have time, we need to further investigate the dlrm Criteo dataset, reported by @jenniew she can't run using RayXShards. As I test, the processed data should be <800g and can run on 8 nodes X 100g object store memory.

hkvision commented 3 years ago
Traceback (most recent call last):
  File "train_wnd_tf2.py", line 258, in <module>
    label_cols=[column_info.label])
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/learn/tf2/estimator.py", line 231, in fit
    ray_xshards = process_spark_xshards(data, self.num_workers)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/utils.py", line 300, in process_spark_xshards
    ray_xshards = RayXShards.from_spark_xshards(data)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 355, in from_spark_xshards
    return RayXShards._from_spark_xshards_ray_api(spark_xshards)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 387, in _from_spark_xshards_ray_api
    result = result_rdd.collect()
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 663 in stage 16.0 failed 4 times, most recent failure: Lost task 663.3 in stage 1$
.0 (TID 7866, Almaren-Node-140, executor 3): java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)

2021-07-23 18:19:44 ERROR TransportRequestHandler:276 - Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655)
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)

http://172.16.0.107:7777/history/application_1626654036089_0342/jobs/ This error also encountered.

hkvision commented 3 years ago
Traceback (most recent call last):
  File "train_wnd_tf2.py", line 258, in <module>
    label_cols=[column_info.label])
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/learn/tf2/estimator.py", line 231, in fit
    ray_xshards = process_spark_xshards(data, self.num_workers)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/utils.py", line 300, in process_spark_xshards
    ray_xshards = RayXShards.from_spark_xshards(data)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 355, in from_spark_xshards
    return RayXShards._from_spark_xshards_ray_api(spark_xshards)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 383, in _from_spark_xshards_ray_api
    ray.get([v.get_partitions.remote() for v in partition_stores.values()])
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/ray/worker.py", line 1456, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::LocalStore.get_partitions() (pid=24448, ip=172.16.0.169)
  File "python/ray/_raylet.pyx", line 439, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 476, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
RuntimeError: The actor with name LocalStore failed to be imported, and so cannot execute this method.

http://172.16.0.107:7777/history/application_1626654036089_0496/jobs/ http://172.16.0.107:7777/history/application_1626654036089_0548/jobs/

(pid=175199, ip=172.16.0.114) 2021-07-28 13:27:48,099   ERROR function_manager.py:498 -- Failed to load actor class LocalStore.
(pid=175199, ip=172.16.0.114) Traceback (most recent call last):
(pid=175199, ip=172.16.0.114)   File "/disk4/yarn/nm/usercache/root/appcache/application_1626654036089_0548/container_1626654036089_0548_01_000008/python_en
v/lib/python3.7/site-packages/ray/function_manager.py", line 496, in _load_actor_class_from_gcs
(pid=175199, ip=172.16.0.114) ModuleNotFoundError: No module named 'zoo'
shanyu-sys commented 3 years ago
Traceback (most recent call last):
  File "train_wnd_tf2.py", line 258, in <module>
    label_cols=[column_info.label])
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/learn/tf2/estimator.py", line 231, in fit
    ray_xshards = process_spark_xshards(data, self.num_workers)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/utils.py", line 300, in process_spark_xshards
    ray_xshards = RayXShards.from_spark_xshards(data)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 355, in from_spark_xshards
    return RayXShards._from_spark_xshards_ray_api(spark_xshards)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 383, in _from_spark_xshards_ray_api
    ray.get([v.get_partitions.remote() for v in partition_stores.values()])
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/recsys-kai/lib/python3.7/site-packages/ray/worker.py", line 1456, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::LocalStore.get_partitions() (pid=24448, ip=172.16.0.169)
  File "python/ray/_raylet.pyx", line 439, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 476, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
RuntimeError: The actor with name LocalStore failed to be imported, and so cannot execute this method.

http://172.16.0.107:7777/history/application_1626654036089_0496/jobs/ http://172.16.0.107:7777/history/application_1626654036089_0548/jobs/

(pid=175199, ip=172.16.0.114) 2021-07-28 13:27:48,099   ERROR function_manager.py:498 -- Failed to load actor class LocalStore.
(pid=175199, ip=172.16.0.114) Traceback (most recent call last):
(pid=175199, ip=172.16.0.114)   File "/disk4/yarn/nm/usercache/root/appcache/application_1626654036089_0548/container_1626654036089_0548_01_000008/python_en
v/lib/python3.7/site-packages/ray/function_manager.py", line 496, in _load_actor_class_from_gcs
(pid=175199, ip=172.16.0.114) ModuleNotFoundError: No module named 'zoo'

This occurs at converting spark DataFrame to SparkXShards, caused by You could find the error

ERROR YarnScheduler:70 - Lost executor 1 on Almaren-Node-164: Container killed by YARN for exceeding memory limits.  160.2 GB of 160 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

So you should increase the spark.yarn.executor.memoryOverhead.

According the memory usage I tested with less partitions. parquet files records read as train_df/test_df estimator.from_keras df2sparkxshards rayxshards training peak training static
2 762388 1.86G 12g 5g 0.6g 30g 20g
6 2244187 3.4G 19.4g 11.3g 1g 27g 16.1g
16 6063285 5.9G 18G 25.7g - 30.5g 20g

df2sparkxshards used 25g with 16 partitions. When running the full dataset, each executor used 44 cores and handles 44 partitions at the same time. Therefore the memory needed in df2SparkXShards could be almost 70g..

shanyu-sys commented 3 years ago
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,348 E 183437 183437] logging.cc:415: *** Aborted at 1627440876 (unix time) try "date -d @1627440876" if you are using GNU date ***
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,349 E 183437 183437] logging.cc:415: PC: @                0x0 (unknown)
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,350 E 183437 183437] logging.cc:415: *** SIGSEGV (@0x0) received by PID 183437 (TID 0x7f54fcd5c800) from PID 0; stack trace: ***
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,350 E 183437 183437] logging.cc:415:     @     0x5645cd65d5df google::(anonymous namespace)::FailureSignalHandler()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,350 E 183437 183437] logging.cc:415:     @     0x7f54fc75c5d0 (unknown)
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,352 E 183437 183437] logging.cc:415:     @     0x5645cd506f7a ray::gcs::CallbackReply::CallbackReply()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,353 E 183437 183437] logging.cc:415:     @     0x5645cd507429 ray::gcs::GlobalRedisCallback()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,354 E 183437 183437] logging.cc:415:     @     0x5645cd51bb5c redisProcessCallbacks
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,355 E 183437 183437] logging.cc:415:     @     0x5645cd50dbaa ray::gcs::RedisAsyncContext::RedisAsyncHandleRead()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,356 E 183437 183437] logging.cc:415:     @     0x5645cd50d28e RedisAsioClient::handle_io()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,357 E 183437 183437] logging.cc:415:     @     0x5645cd50cf36 boost::asio::detail::reactive_null_buffers_op<>::do_complete()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,358 E 183437 183437] logging.cc:415:     @     0x5645cd7b0ab1 boost::asio::detail::scheduler::do_run_one()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,360 E 183437 183437] logging.cc:415:     @     0x5645cd7b21c9 boost::asio::detail::scheduler::run()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,360 E 183437 183437] logging.cc:415:     @     0x5645cd7b42d7 boost::asio::io_context::run()
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,361 E 183437 183437] logging.cc:415:     @     0x5645cd359cbc main
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,362 E 183437 183437] logging.cc:415:     @     0x7f54fbe983d5 __libc_start_main
(pid=gcs_server, ip=172.16.0.135) [2021-07-28 10:54:36,363 E 183437 183437] logging.cc:415:     @     0x5645cd36bf57 (unknown)

This error might be related to PR https://github.com/ray-project/ray/pull/16620. So it might be solved by upgrade ray.

jenniew commented 3 years ago

After upgrade to ray==1.5.0, and get such error:

Traceback (most recent call last):
  File "train_wnd_tf2.py", line 258, in <module>
    label_cols=[column_info.label])
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/learn/tf2/estimator.py", line 230, in fit
    ray_xshards = process_spark_xshards(data, self.num_workers)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/utils.py", line 300, in process_spark_xshards
    ray_xshards = RayXShards.from_spark_xshards(data)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 355, in from_spark_xshards
    return RayXShards._from_spark_xshards_ray_api(spark_xshards)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 387, in _from_spark_xshards_ray_api
    result = result_rdd.collect()
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
[Stage 16:>                                                     (0 + 94) / 1007]py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 16.0 failed 4 times, most recent failure: Lost task 6.3 in stage 16.0 (TID 5780, Almaren-Node-137, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 386, in <lambda>
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/python_env/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 79, in write_to_ray
    local_store = ray.get_actor(local_store_name)
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/python_env/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 82, in wrapper
    return func(*args, **kwargs)
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/python_env/lib/python3.7/site-packages/ray/worker.py", line 1729, in get_actor
    handle = worker.core_worker.get_named_actor_handle(name)
  File "python/ray/_raylet.pyx", line 1565, in ray._raylet.CoreWorker.get_named_actor_handle
  File "python/ray/_raylet.pyx", line 158, in ray._raylet.check_status
ValueError: Failed to look up actor with name 'partition:54e8e3b7-b6a1-4e29-b402-ddd2925048b6:node:172.16.0.137'. You are either trying to look up a named actor you didn't create, the named actor died, or the actor hasn't been created because named actor creation is asynchronous.

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
shanyu-sys commented 3 years ago

After upgrade to ray==1.5.0, and get such error:

Traceback (most recent call last):
  File "train_wnd_tf2.py", line 258, in <module>
    label_cols=[column_info.label])
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/learn/tf2/estimator.py", line 230, in fit
    ray_xshards = process_spark_xshards(data, self.num_workers)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/utils.py", line 300, in process_spark_xshards
    ray_xshards = RayXShards.from_spark_xshards(data)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 355, in from_spark_xshards
    return RayXShards._from_spark_xshards_ray_api(spark_xshards)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 387, in _from_spark_xshards_ray_api
    result = result_rdd.collect()
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
[Stage 16:>                                                     (0 + 94) / 1007]py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 16.0 failed 4 times, most recent failure: Lost task 6.3 in stage 16.0 (TID 5780, Almaren-Node-137, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/root/anaconda3/envs/intel_tf24_37/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 386, in <lambda>
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/python_env/lib/python3.7/site-packages/zoo/orca/data/ray_xshards.py", line 79, in write_to_ray
    local_store = ray.get_actor(local_store_name)
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/python_env/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 82, in wrapper
    return func(*args, **kwargs)
  File "/yarn/nm/usercache/root/appcache/application_1626654036089_0642/container_1626654036089_0642_01_000004/python_env/lib/python3.7/site-packages/ray/worker.py", line 1729, in get_actor
    handle = worker.core_worker.get_named_actor_handle(name)
  File "python/ray/_raylet.pyx", line 1565, in ray._raylet.CoreWorker.get_named_actor_handle
  File "python/ray/_raylet.pyx", line 158, in ray._raylet.check_status
ValueError: Failed to look up actor with name 'partition:54e8e3b7-b6a1-4e29-b402-ddd2925048b6:node:172.16.0.137'. You are either trying to look up a named actor you didn't create, the named actor died, or the actor hasn't been created because named actor creation is asynchronous.

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I will create a PR for ray 1.5.0.

To adapt to ray 1.5.0. We need to change our code for ray namedspace.

shanyu-sys commented 3 years ago

@hkvision @jenniew

I have created a temporary PR intel-analytics/analytics-zoo#4383 that may help with the issue. Could you kindly have a try? Note that it requires ray==1.5.0.

I tried several times on Almaren and it seems to work for me. Some of my memory configs are listed as below.

conf = {"spark.executor.memoryOverhead": "130g"}
executor_cores = 44
num_executor = 12
executor_memory = "30g"
object_store_memory="40g"

I didn't change other configs based on the script of Kai.

shanyu-sys commented 3 years ago

PR intel-analytics/analytics-zoo#4386 and PR intel-analytics/analytics-zoo#4387 for this issue has been merged. You can update the latest zoo to have a try. Note that ray==1.5.0 could help with the issue. The temporary PR intel-analytics/analytics-zoo#4383 will be closed.

jenniew commented 3 years ago

After tried latest zoo code and ray 1.5.0, I didn't meet such issue again.

shanyu-sys commented 3 years ago

After tried latest zoo code and ray 1.5.0, I didn't meet such issue again.

Great! It seems to work.

I will close the issue and contact me for any further issues.