I have a list of discounts, and for each discount, I iterate through them. Within each iteration, I generate a few new columns and then predict the sales amount for each product associated with that discount. I aim to compile all the results from these iterations into a single dataframe to have a comprehensive view of every discount on the list along with the predictions. This process has already been successfully implemented in native H2O using Pandas. However, I am now attempting to replicate it in Spark H2O and PySpark.
Unfortunately, when attempting to read the dataframe, I encounter the following error:
RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 256.0 failed 4 times, most recent failure: Lost task 0.3 in stage 256.0 (TID 3451) (10.159.20.11 executor 0): ai.h2o.sparkling.backend.exceptions.RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with Status code: 400 : Bad Request
I am seeking a workaround or advice on whether I might be making a mistake.
Expected Behavior: I should be able to access and manipulate the dataframe without issues.
Observed Behavior: The error "RestApiCommunicationException: H2O node http://10.159.20.11:54321/ responded with" prevents further progress.
What I am doing now is saving the predictions to a delta table and then access it, but that takes a little more time than what I expected. I know that loops are not very recommended for PySpark but I am also not seeing a better way of doing this.
Thank you so much!
Programming language used
Python
Programming language version
3.8
What environment are you running Sparkling Water on?
RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 256.0 failed 4 times, most recent failure: Lost task 0.3 in stage 256.0 (TID 3451) (10.159.20.11 executor 0): ai.h2o.sparkling.backend.exceptions.RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with
Status code: 400 : Bad Request
Server error: {"__meta":{"schema_version":3,"schema_name":"H2OErrorV3","schema_type":"H2OError"},"timestamp":1710933634967,"error_url":"/3/Chunk","msg":"\n\nERROR MESSAGE:\n\nA frame with name 'py_5_sid_8ef7' doesn't exist.\n\n","dev_msg":"\n\nERROR MESSAGE:\n\nA frame with name 'py_5_sid_8ef7' doesn't exist.\n\n","http_status":400,"values":{},"exception_type":"java.lang.IllegalArgumentException","exception_msg":"\n\nERROR MESSAGE:\n\nA frame with name 'py_5_sid_8ef7' doesn't exist.\n\n","stacktrace":["java.lang.IllegalArgumentException: A frame with name 'py_5_sid_8ef7' doesn't exist."," ai.h2o.sparkling.extensions.rest.api.ChunkServlet$POSTRequestParameters.validate(ChunkServlet.scala:44)"," ai.h2o.sparkling.extensions.rest.api.ChunkServlet.$anonfun$doPost$1(ChunkServlet.scala:119)"," ai.h2o.sparkling.extensions.rest.api.ServletBase.processRequest(ServletBase.scala:36)"," ai.h2o.sparkling.extensions.rest.api.ChunkServlet.doPost(ChunkServlet.scala:117)"," javax.servlet.http.HttpServlet.service(HttpServlet.java:523)"," javax.servlet.http.HttpServlet.service(HttpServlet.java:590)"," ai.h2o.org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)"," ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)"," ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)"," ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)"," ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)"," water.webserver.jetty9.Jetty9ServerAdapter$LoginHandler.handle(Jetty9ServerAdapter.java:130)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)"," ai.h2o.org.eclipse.jetty.server.Server.handle(Server.java:531)"," ai.h2o.org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)"," ai.h2o.org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)"," ai.h2o.org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)"," ai.h2o.org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)"," ai.h2o.org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)"," ai.h2o.org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)"," ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)"," ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)"," java.lang.Thread.run(Thread.java:750)"]}
at ai.h2o.sparkling.backend.utils.RestCommunication.checkResponseCode(RestCommunication.scala:405)
at ai.h2o.sparkling.backend.utils.RestCommunication.checkResponseCode$(RestCommunication.scala:384)
at ai.h2o.sparkling.backend.H2OChunk$.checkResponseCode(H2OChunk.scala:32)
at ai.h2o.sparkling.backend.utils.RestCommunication.readURLContent(RestCommunication.scala:376)
at ai.h2o.sparkling.backend.utils.RestCommunication.readURLContent$(RestCommunication.scala:360)
at ai.h2o.sparkling.backend.H2OChunk$.readURLContent(H2OChunk.scala:32)
at ai.h2o.sparkling.backend.H2OChunk$.getChunkAsInputStream(H2OChunk.scala:53)
at ai.h2o.sparkling.backend.Reader.<init>(Reader.scala:45)
at ai.h2o.sparkling.backend.H2ODataFrame$$anon$1.reader$lzycompute(H2ODataFrame.scala:74)
at ai.h2o.sparkling.backend.H2ODataFrame$$anon$1.reader(H2ODataFrame.scala:66)
at ai.h2o.sparkling.backend.H2OSparkEntity$H2OChunkIterator.hasNext(H2OSparkEntity.scala:50)
at ai.h2o.sparkling.backend.H2OSparkEntity$H2OChunkIterator.hasNext$(H2OSparkEntity.scala:50)
at ai.h2o.sparkling.backend.H2ODataFrame$$anon$1.hasNext(H2ODataFrame.scala:64)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$5(UnsafeRowBatchUtils.scala:88)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$3(UnsafeRowBatchUtils.scala:88)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$1(UnsafeRowBatchUtils.scala:68)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$2(Collector.scala:197)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:899)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:902)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:797)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3645)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3567)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3554)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3554)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1521)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1521)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1521)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3890)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3790)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1245)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1233)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2959)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:338)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:282)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:366)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:363)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:117)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:553)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:519)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:516)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3628)
at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3619)
at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4544)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:959)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4542)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:298)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:528)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:225)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:154)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:477)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4542)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3618)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:773)
at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1105)
at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:261)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)
Caused by: ai.h2o.sparkling.backend.exceptions.RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with
Status code: 400 : Bad Request
Server error: {"__meta":{"schema_version":3,"schema_name":"H2OErrorV3","schema_type":"H2OError"},"timestamp":1710933634967,"error_url":"/3/Chunk","msg":"\n\nERROR MESSAGE:\n\nA frame with name 'py_5_sid_8ef7' doesn't exist.\n\n","dev_msg":"\n\nERROR MESSAGE:\n\nA frame with name 'py_5_sid_8ef7' doesn't exist.\n\n","http_status":400,"values":{},"exception_type":"java.lang.IllegalArgumentException","exception_msg":"\n\nERROR MESSAGE:\n\nA frame with name 'py_5_sid_8ef7' doesn't exist.\n\n","stacktrace":["java.lang.IllegalArgumentException: A frame with name 'py_5_sid_8ef7' doesn't exist."," ai.h2o.sparkling.extensions.rest.api.ChunkServlet$POSTRequestParameters.validate(ChunkServlet.scala:44)"," ai.h2o.sparkling.extensions.rest.api.ChunkServlet.$anonfun$doPost$1(ChunkServlet.scala:119)"," ai.h2o.sparkling.extensions.rest.api.ServletBase.processRequest(ServletBase.scala:36)"," ai.h2o.sparkling.extensions.rest.api.ChunkServlet.doPost(ChunkServlet.scala:117)"," javax.servlet.http.HttpServlet.service(HttpServlet.java:523)"," javax.servlet.http.HttpServlet.service(HttpServlet.java:590)"," ai.h2o.org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)"," ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)"," ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)"," ai.h2o.org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)"," ai.h2o.org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)"," ai.h2o.org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)"," water.webserver.jetty9.Jetty9ServerAdapter$LoginHandler.handle(Jetty9ServerAdapter.java:130)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)"," ai.h2o.org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)"," ai.h2o.org.eclipse.jetty.server.Server.handle(Server.java:531)"," ai.h2o.org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)"," ai.h2o.org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)"," ai.h2o.org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)"," ai.h2o.org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)"," ai.h2o.org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)"," ai.h2o.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)"," ai.h2o.org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)"," ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)"," ai.h2o.org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)"," java.lang.Thread.run(Thread.java:750)"]}
at ai.h2o.sparkling.backend.utils.RestCommunication.checkResponseCode(RestCommunication.scala:405)
at ai.h2o.sparkling.backend.utils.RestCommunication.checkResponseCode$(RestCommunication.scala:384)
at ai.h2o.sparkling.backend.H2OChunk$.checkResponseCode(H2OChunk.scala:32)
at ai.h2o.sparkling.backend.utils.RestCommunication.readURLContent(RestCommunication.scala:376)
at ai.h2o.sparkling.backend.utils.RestCommunication.readURLContent$(RestCommunication.scala:360)
at ai.h2o.sparkling.backend.H2OChunk$.readURLContent(H2OChunk.scala:32)
at ai.h2o.sparkling.backend.H2OChunk$.getChunkAsInputStream(H2OChunk.scala:53)
at ai.h2o.sparkling.backend.Reader.<init>(Reader.scala:45)
at ai.h2o.sparkling.backend.H2ODataFrame$$anon$1.reader$lzycompute(H2ODataFrame.scala:74)
at ai.h2o.sparkling.backend.H2ODataFrame$$anon$1.reader(H2ODataFrame.scala:66)
at ai.h2o.sparkling.backend.H2OSparkEntity$H2OChunkIterator.hasNext(H2OSparkEntity.scala:50)
at ai.h2o.sparkling.backend.H2OSparkEntity$H2OChunkIterator.hasNext$(H2OSparkEntity.scala:50)
at ai.h2o.sparkling.backend.H2ODataFrame$$anon$1.hasNext(H2ODataFrame.scala:64)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$5(UnsafeRowBatchUtils.scala:88)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$3(UnsafeRowBatchUtils.scala:88)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$1(UnsafeRowBatchUtils.scala:68)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$2(Collector.scala:197)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:899)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:902)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:797)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Code to reproduce the issue
output_dfs = []
for discount in price_discounts:
df = df.withColumn(...)
hf_df = hc.asH2OFrame(df)
preds_delta = model_delta.predict(hf_df)
error_preds_delta = error_model_delta.predict(hf_df)
preds_eol = model_eol.predict(hf_df)
error_preds_eol = error_model_eol.predict(hf_df)
hf_df = (hf_df .cbind(preds_delta).cbind(error_preds_delta * 0.5).cbind(preds_eol).cbind(error_preds_eol * 0.5))
new_df = hc.asSparkFrame(hf_df)
output_dfs.append(predictions)
#outside of the loop I do this, but get the error "RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with"
predictions = functools.reduce(DataFrame.union, output_dfs)
hey @tiagomotap
I agree the loop might not be the best idea :) I would suggest to rethink how the problem could be solved by operating on one dataset and trying to group the records
Sparkling Water Version
3.44.0.3
Issue description
Hello, everyone!
I have a list of discounts, and for each discount, I iterate through them. Within each iteration, I generate a few new columns and then predict the sales amount for each product associated with that discount. I aim to compile all the results from these iterations into a single dataframe to have a comprehensive view of every discount on the list along with the predictions. This process has already been successfully implemented in native H2O using Pandas. However, I am now attempting to replicate it in Spark H2O and PySpark.
Unfortunately, when attempting to read the dataframe, I encounter the following error:
RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 256.0 failed 4 times, most recent failure: Lost task 0.3 in stage 256.0 (TID 3451) (10.159.20.11 executor 0): ai.h2o.sparkling.backend.exceptions.RestApiCommunicationException: H2O node http://10.159.20.11:54321 responded with Status code: 400 : Bad Request
I am seeking a workaround or advice on whether I might be making a mistake.
Expected Behavior: I should be able to access and manipulate the dataframe without issues.
Observed Behavior: The error "RestApiCommunicationException: H2O node http://10.159.20.11:54321/ responded with" prevents further progress.
What I am doing now is saving the predictions to a delta table and then access it, but that takes a little more time than what I expected. I know that loops are not very recommended for PySpark but I am also not seeing a better way of doing this.
Thank you so much!
Programming language used
Python
Programming language version
3.8
What environment are you running Sparkling Water on?
Cloud Managed Spark (like Databricks, AWS Glue)
Environment version info
13.3.x-cpu-ml-scala2.12
Brief cluster specification
1-3 Workers 32-96 GB Memory 8-24 Cores /// 1 Driver 32 GB Memory, 8 Cores
Relevant log output
Code to reproduce the issue