Azure / azure-kusto-spark

Apache Spark Connector for Azure Kusto
Apache License 2.0
77 stars 35 forks source link

Unable to write spark dataframe to ADX table using kusto #406

Closed LorenzF closed 1 month ago

LorenzF commented 1 month ago

Describe the bug When writing to Azure Data Explorer using kusto, an error occurs stating

'java.lang.NoSuchMethodError: com.microsoft.azure.kusto.data.ClientFactory.createClient'

To Reproduce

Databricks compute version: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12)

Node type: Standard D4ads_v5

installed libraries:

Run the following script on Azure Databricks:

from pyspark.sql import SparkSession

# COMMAND ----------

# Optional:
sc._jvm.com.microsoft.kusto.spark.utils.KustoDataSourceUtils.setLoggingLevel("all")
# COMMAND ----------

pyKusto = SparkSession.builder.appName("kustoPySpark").getOrCreate()
kustoOptions = {
    "kustoCluster": "<REDACTED>", 
    "kustoDatabase" : "<REDACTED>", 
    "kustoTable" : "<REDACTED>", 
    "kustoAadAppId": "<REDACTED>",
    "kustoAadAppSecret": "<REDACTED>", 
    "kustoAadAuthorityID": "<REDACTED>"
}
# Create a DataFrame for ingestion
df = spark.createDataFrame([("row-"+str(i),i)for i in range(1000)],["name", "value"])

# COMMAND ----------

#######################
# BATCH SINK EXAMPLE  #
#######################

# Write data to a Kusto table
df.write. \
  format("com.microsoft.kusto.spark.datasource"). \
  option("kustoCluster",kustoOptions["kustoCluster"]). \
  option("kustoDatabase",kustoOptions["kustoDatabase"]). \
  option("kustoTable", kustoOptions["kustoTable"]). \
  option("kustoAadAppId",kustoOptions["kustoAadAppId"]). \
  option("kustoAadAppSecret",kustoOptions["kustoAadAppSecret"]). \
  option("kustoAadAuthorityID",kustoOptions["kustoAadAuthorityID"]). \
  option("tableCreateOptions","CreateIfNotExist"). \
  mode("Append"). \
  save()

it outputs

Py4JJavaError: An error occurred while calling o584.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 257.0 failed 4 times, most recent failure: Lost task 2.3 in stage 257.0 (TID 1593) (10.139.64.4 executor driver): java.lang.NoSuchMethodError: com.microsoft.azure.kusto.data.ClientFactory.createClient(Lcom/microsoft/azure/kusto/data/auth/ConnectionStringBuilder;Lorg/apache/http/impl/client/CloseableHttpClient;)Lcom/microsoft/azure/kusto/data/Client;
    at com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl.<init>(QueuedIngestClientImpl.java:58)
    at com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl.<init>(QueuedIngestClientImpl.java:51)
    at com.microsoft.azure.kusto.ingest.IngestClientFactory.createClient(IngestClientFactory.java:48)
    at com.microsoft.azure.kusto.ingest.IngestClientFactory.createClient(IngestClientFactory.java:25)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.ingestClient$lzycompute(ExtendedKustoClient.scala:53)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.ingestClient(ExtendedKustoClient.scala:53)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestToTemporaryTableByWorkers(KustoWriter.scala:340)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoTempTbl(KustoWriter.scala:262)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7(KustoWriter.scala:211)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7$adapted(KustoWriter.scala:210)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1093)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1093)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:3123)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:211)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:134)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:102)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1032)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1035)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:922)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:3998)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3996)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3910)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3897)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3897)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1758)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1741)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1741)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4256)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4159)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4145)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1404)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1392)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:3083)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:3064)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:3104)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:3123)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:3148)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1093)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:454)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1091)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:210)
    at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:70)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$2(commands.scala:84)
    at org.apache.spark.sql.execution.SparkPlan.runCommandWithAetherOff(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.runCommandInAetherOrSpark(SparkPlan.scala:191)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:84)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:81)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:80)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:94)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$5(QueryExecution.scala:385)
    at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$4(QueryExecution.scala:385)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:176)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:385)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:455)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:793)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:333)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:204)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:730)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:381)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1177)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:377)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:327)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:374)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:349)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:505)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:505)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:40)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:379)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:375)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:40)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:40)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:481)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:349)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:436)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:349)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:286)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:283)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:440)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1043)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:444)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:406)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.kusto.data.ClientFactory.createClient(Lcom/microsoft/azure/kusto/data/auth/ConnectionStringBuilder;Lorg/apache/http/impl/client/CloseableHttpClient;)Lcom/microsoft/azure/kusto/data/Client;
    at com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl.<init>(QueuedIngestClientImpl.java:58)
    at com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl.<init>(QueuedIngestClientImpl.java:51)
    at com.microsoft.azure.kusto.ingest.IngestClientFactory.createClient(IngestClientFactory.java:48)
    at com.microsoft.azure.kusto.ingest.IngestClientFactory.createClient(IngestClientFactory.java:25)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.ingestClient$lzycompute(ExtendedKustoClient.scala:53)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.ingestClient(ExtendedKustoClient.scala:53)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestToTemporaryTableByWorkers(KustoWriter.scala:340)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoTempTbl(KustoWriter.scala:262)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7(KustoWriter.scala:211)
    at com.microsoft.kusto.spark.datasink.KustoWriter$.$anonfun$write$7$adapted(KustoWriter.scala:210)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1093)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1093)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:3123)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:211)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:134)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:102)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1032)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1035)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:922)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
File <command-1154050524554058>, line 38
     19 df = spark.createDataFrame([("row-"+str(i),i)for i in range(1000)],["name", "value"])
     21 # COMMAND ----------
     22 
     23 #######################
   (...)
     26 
     27 # Write data to a Kusto table
     28 df.write. \
     29   format("com.microsoft.kusto.spark.datasource"). \
     30   option("kustoCluster",kustoOptions["kustoCluster"]). \
     31   option("kustoDatabase",kustoOptions["kustoDatabase"]). \
     32   option("kustoTable", kustoOptions["kustoTable"]). \
     33   option("kustoAadAppId",kustoOptions["kustoAadAppId"]). \
     34   option("kustoAadAppSecret",kustoOptions["kustoAadAppSecret"]). \
     35   option("kustoAadAuthorityID",kustoOptions["kustoAadAuthorityID"]). \
     36   option("tableCreateOptions","CreateIfNotExist"). \
     37   mode("Append"). \
---> 38   save()
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Expected behavior The script should be able to write a table to Azure Data Explorer, but fails.

Desktop (please complete the following information): NA - using Azure Databricks

Thanks for your help!

ag-ramachandran commented 1 month ago

Please exclude the Kusto ingest and data jars. They are not needed and retry

com.microsoft.azure.kusto:kusto-data:5.2.0 com.microsoft.azure.kusto:kusto-ingest:5.2.0

LorenzF commented 1 month ago

I tried the following (in mentioned order):

Therefore it seems not to be the other libraries but the 5.2.2 is not working for me. You can consider this issue resolved. Perhaps updating the README.md file with the latest version can prevent others from having the same issue.

Thank you for the quick reply,

ag-ramachandran commented 1 month ago

Kusto ingest and data jars are now packed as part of the uber jars. You technically need not have them . They should automatically be resolved.

Are you using a maven based import or a Uber jar from GH releases for the test. I will try and do a replication if there is anything I need to check

LorenzF commented 1 month ago

I'm using the maven based import in databricks,

here is my current (working) setup of libraries for my compute cluster, using kusto-spark_3.0_2.12:5.2.2 does not work Image

will remove the ingest and data jars soon

ag-ramachandran commented 1 month ago

Thanks for clarifying @LorenzF

5.2.2 does seem to have the Class error in ADB 15.4

I just tried 5.2.3 without the ingest and data jars and they work, attached was the web record i did for that. So we'd probably not need to update the docs

https://github.com/user-attachments/assets/0bae20d0-ab7a-4a6f-82c6-3da2b064cc48