Azure / azure-kusto-spark

Apache Spark Connector for Azure Kusto
Apache License 2.0
77 stars 34 forks source link

com.microsoft.azure.kusto.data.auth.CloudDependentTokenProviderBase.initializeWithCloudInfo throws Null Pointer Exception #366

Closed ConfieBlake closed 5 months ago

ConfieBlake commented 6 months ago

I have imported com.microsoft.kusto.spark.datasink.KustoSinkOptions. The version of kusto-spark is kusto-spark_3.0_2.12: 5.0.6

I have added Admin privilege for my service principal in my Azure Data Explorer, and I could create table using Azure function with the service principal.

But when running the Spark Scala job, it throws the null pointer exception at com.microsoft.azure.kusto.data.auth.CloudDependentTokenProviderBase.initializeWithCloudInfo, not sure what is wrong, here is my code:

val spark = SparkSession
  .builder()
  .appName("KustoDump")
  .getOrCreate()

val MyLog = spark.read.format("sstream")
  .load(streamPath + streamName)

MyLog .write
  .format("com.microsoft.kusto.spark.datasource")
  .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
  .option(KustoSinkOptions.KUSTO_DATABASE, database)
  .option(KustoSinkOptions.KUSTO_TABLE, tableName)
  .option(KustoSinkOptions.KUSTO_AAD_APP_ID, AppId)
  .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, AppKey)
  .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
  .mode(SaveMode.Append)
  .save()

My cluster, database, tablename, input stream path & name are well-defined and app Id, app Key are passed in. The format of the input is indeed sstream, it could output to an ADLS container but failed when trying to write to ADX.

This is part of the pom.xml

<dependency>
  <groupId>com.microsoft.azure.kusto</groupId>
  <artifactId>kusto-spark_3.0_2.12</artifactId>
  <version>5.0.6</version>
  <scope>compile</scope>
</dependency>
ag-ramachandran commented 6 months ago

Hello @ConfieBlake , Thanks for reaching out.

Couple of quick questions,

a) Would it be possible to send me the full stack trace b) is the Kusto cluster behind a private endpoint (or) do you have any VNET specific rules that are applied

Can you send them to my alias (ramacg)

Regards Ram

ConfieBlake commented 6 months ago

Hi @ag-ramachandran, really appreciate your quick response!

For a), here is the full stack track:

2024-02-20T23:50:34,368 ERROR [Driver] yarn.ApplicationMaster: User class threw exception: java.lang.NullPointerException java.lang.NullPointerException: null at com.microsoft.azure.kusto.data.auth.CloudDependentTokenProviderBase.initializeWithCloudInfo(CloudDependentTokenProviderBase.java:47) ~[app.jar:?] at com.microsoft.azure.kusto.data.auth.MsalTokenProviderBase.initializeWithCloudInfo(MsalTokenProviderBase.java:40) ~[app.jar:?] at com.microsoft.azure.kusto.data.auth.ConfidentialAppTokenProviderBase.initializeWithCloudInfo(ConfidentialAppTokenProviderBase.java:36) ~[app.jar:?] at com.microsoft.azure.kusto.data.auth.CloudDependentTokenProviderBase.initialize(CloudDependentTokenProviderBase.java:41) ~[app.jar:?] at com.microsoft.azure.kusto.data.auth.TokenProviderBase.acquireAccessToken(TokenProviderBase.java:30) ~[app.jar:?] at com.microsoft.azure.kusto.data.ClientImpl.generateIngestAndCommandHeaders(ClientImpl.java:405) ~[app.jar:?] at com.microsoft.azure.kusto.data.ClientImpl.executeToJsonResult(ClientImpl.java:213) ~[app.jar:?] at com.microsoft.azure.kusto.data.ClientImpl.executeImpl(ClientImpl.java:173) ~[app.jar:?] at com.microsoft.azure.kusto.data.ClientImpl.lambda$execute$0(ClientImpl.java:122) ~[app.jar:?] at com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity.invoke(MonitoredActivity.java:33) ~[app.jar:?] at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:121) ~[app.jar:?] at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:116) ~[app.jar:?] at com.microsoft.kusto.spark.utils.ExtendedKustoClient.$anonfun$executeEngine$1(ExtendedKustoClient.scala:388) ~[app.jar:?] at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$$anon$2.apply(KustoDataSourceUtils.scala:398) ~[app.jar:?] at io.github.resilience4j.retry.Retry.lambda$decorateCheckedSupplier$3f69f149$1(Retry.java:137) ~[app.jar:?] at io.github.resilience4j.retry.Retry.executeCheckedSupplier(Retry.java:419) ~[app.jar:?] at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.retryApplyFunction(KustoDataSourceUtils.scala:401) ~[app.jar:?] at com.microsoft.kusto.spark.utils.ExtendedKustoClient.executeEngine(ExtendedKustoClient.scala:389) ~[app.jar:?] at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:66) ~[app.jar:?] at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:49) ~[app.jar:?] at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399) ~[spark-sql_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT] at com.microsoft.datamining.utils.KustoDump$.mainKustoDump.scala:71) ~[app.jar:?] at com.microsoft.datamining.utils.KustoDump.main(KustoDump.scala) ~[app.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_202] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_202] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_202] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_202] at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:778) [spark-yarn_2.12-3.0.1-mt-SNAPSHOT.jar:3.0.1-mt-SNAPSHOT]

For b), it is not behind a private endpoint, it allows public access from all networks (if authenticated). I do not think I have any VNET rules.

I will send a copy to your alias.

Thanks!