Azure / azure-kusto-spark

Apache Spark Connector for Azure Kusto
Apache License 2.0
77 stars 34 forks source link

Retry for ingestion Resources #356

Closed asaharn closed 7 months ago

asaharn commented 8 months ago

Pull Request Description

Added retry mechanism for the cases when no storage is returned.

errortrace:

ERROR: Query termination received for [id=96300c8c-6de5-46bc-816e-ce9e78bd2e53, runId=722c6239-67b9-4182-b3b4-3c33c1e37f69], with exception: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in call
    raise e
  File "/databricks/spark/python/pyspark/sql/utils.py", line 114, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/Workspace/Repos/aisfenix/adx-ingest/src/euvdb/jobs_uc.py", line 388, in adx_batch_write_gtr
    fut.result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Workspace/Repos/aisfenix/adx-ingest/src/euvdb/jobs_uc.py", line 430, in write_batch_date_gtr
    self.adx.write(adx_table_name, adx_df, adx_props, write_mode)
  File "/Workspace/Repos/aisfenix/adx-ingest/src/euvdb/clients_uc.py", line 268, in write
    .save()
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions/captured.py", line 188, in deco
    return f(*a, **kw)
  File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1732.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 251.0 failed 1 times, most recent failure: Lost task 0.0 in stage 251.0 (TID 13845) (10.231.232.29 executor driver): java.lang.RuntimeException: Failed to allocate temporary storage
    at com.microsoft.kusto.spark.utils.ContainerProvider.processContainerResults(ContainerProvider.scala:94)
    at com.microsoft.kusto.spark.utils.ContainerProvider.refresh(ContainerProvider.scala:83)
    at com.microsoft.kusto.spark.utils.ContainerProvider.getContainer(ContainerProvider.scala:47)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.getTempBlobForIngestion(ExtendedKustoClient.scala:114)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.createBlobWriter(KustoWriter.scala:249)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRows(KustoWriter.scala:331)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoKusto(KustoWriter.scala:190)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestToTemporaryTableByWorkers(KustoWriter.scala:236)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoTempTbl(KustoWriter.scala:171)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7(KustoWriter.scala:137)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7$adapted(KustoWriter.scala:137)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1086)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1086)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2999)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:897)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:900)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:795)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3588)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3506)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3506)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1516)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1516)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1516)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3835)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3747)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3735)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1240)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1228)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2959)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2942)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2980)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2999)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:3024)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1086)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:448)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1084)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:137)
    at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:49)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:82)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:79)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:272)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:272)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:274)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:498)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:201)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1113)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:151)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:447)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:271)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:245)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:266)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:251)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:316)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:312)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:441)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:251)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:372)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:251)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:203)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:200)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:336)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:956)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:424)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:391)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Failed to allocate temporary storage
    at com.microsoft.kusto.spark.utils.ContainerProvider.processContainerResults(ContainerProvider.scala:94)
    at com.microsoft.kusto.spark.utils.ContainerProvider.refresh(ContainerProvider.scala:83)
    at com.microsoft.kusto.spark.utils.ContainerProvider.getContainer(ContainerProvider.scala:47)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.getTempBlobForIngestion(ExtendedKustoClient.scala:114)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.createBlobWriter(KustoWriter.scala:249)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRows(KustoWriter.scala:331)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoKusto(KustoWriter.scala:190)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestToTemporaryTableByWorkers(KustoWriter.scala:236)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoTempTbl(KustoWriter.scala:171)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7(KustoWriter.scala:137)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7$adapted(KustoWriter.scala:137)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1086)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1086)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2999)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:897)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:900)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:795)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

---

Future Release Comment

[Add description of your change, to include in the next release] [Delete any or all irrelevant sections, e.g. if your change does not warrant a release comment at all]

Breaking Changes:

Features:

Fixes:

github-actions[bot] commented 8 months ago

Test Results

0 tests   0 :white_check_mark:  0s :stopwatch: 0 suites  0 :zzz: 0 files    0 :x:

Results for commit 674b6a95.

:recycle: This comment has been updated with latest results.