Describe the bug
Recently with Spark 3.4, we are getting the following error:
Fail to write to Kusto (Azure Data Explore), on Spark Stream. Throwing the following exception.
It was working with Spark 3.3 but after upgrde to 3.4 and other library upgrades, it is giving the following error
Expected behavior
The data should be written to database
Error
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1396, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
1394 self.format(format)
1395 if path is None:
-> 1396 self._jwrite.save()
1397 else:
1398 self._jwrite.save(path)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o35663.save.
: java.lang.NoClassDefFoundError: com/twitter/util/TimeoutException
at com.microsoft.kusto.spark.synapse.utils.LSUtils.getLSCluster(SynapseLSRUtils.scala:57)
at com.microsoft.kusto.spark.synapse.utils.LSUtils.getLSCluster$(SynapseLSRUtils.scala:55)
at com.microsoft.kusto.spark.synapse.utils.SynapseLSUtils$.getLSCluster(SynapseLSRUtils.scala:68)
at com.microsoft.kusto.spark.synapse.utils.KustoLSDataSourceUtils.$anonfun$convertLinkedServiceToKustoParameters$3(KustoSynapseDataSourceUtils.scala:116)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.microsoft.kusto.spark.synapse.utils.ComponentEventPublisherEx$.$anonfun$publishComponentEventFor$1(AppEventPublisher.scala:66)
at scala.util.Try$.apply(Try.scala:213)
at com.microsoft.spark.utils.CommonUtils$.executeFunction(CommonUtils.scala:55)
at com.microsoft.spark.utils.CommonUtils$.getBlockTimeAndResult(CommonUtils.scala:36)
at com.microsoft.kusto.spark.synapse.utils.ComponentEventPublisherEx$.publishComponentEventFor(AppEventPublisher.scala:65)
at com.microsoft.kusto.spark.synapse.utils.EventPublisher.publishComponentEventFor(AppEventPublisher.scala:30)
at com.microsoft.kusto.spark.synapse.utils.KustoLSDataSourceUtils.convertLinkedServiceToKustoParameters(KustoSynapseDataSourceUtils.scala:111)
at com.microsoft.kusto.spark.synapse.utils.KustoLSDataSourceUtils.convertLinkedServiceToKustoParameters$(KustoSynapseDataSourceUtils.scala:84)
at com.microsoft.kusto.spark.synapse.utils.KustoSynapseLSDataSourceUtils$.convertLinkedServiceToKustoParameters(KustoSynapseDataSourceUtils.scala:170)
at com.microsoft.kusto.spark.synapse.datasource.BaseDefaultSource.$anonfun$createRelation$1(DefaultSource.scala:23)
at com.microsoft.kusto.spark.synapse.utils.ComponentEventPublisherEx$.$anonfun$publishComponentEventFor$1(AppEventPublisher.scala:66)
at scala.util.Try$.apply(Try.scala:213)
at com.microsoft.spark.utils.CommonUtils$.executeFunction(CommonUtils.scala:55)
at com.microsoft.spark.utils.CommonUtils$.getBlockTimeAndResult(CommonUtils.scala:36)
at com.microsoft.kusto.spark.synapse.utils.ComponentEventPublisherEx$.publishComponentEventFor(AppEventPublisher.scala:65)
at com.microsoft.kusto.spark.synapse.utils.EventPublisher.publishComponentEventFor(AppEventPublisher.scala:30)
at com.microsoft.kusto.spark.synapse.datasource.BaseDefaultSource.createRelation(DefaultSource.scala:20)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:152)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:120)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:209)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:105)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:152)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:145)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:145)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:129)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:123)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:200)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:897)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:412)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:379)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
@aayushsin When you "upgraded" to spark 3.4. Did you save the linked service and publish your workspace ? The connector revisions have not changed between versions in Synapse, so it is most likely an issue in the linked service not being published.
I was using an existing Synapse Linked Service. The issue is not coming up with Spark 3.3 but only with spark 3.4
I am guessing since Spark 3.4 is in preview state in Synpase. This issue is there.
@ag-ramachandran Can you please check the integration with Spark 3.4 with an existsing Linked service.
Hi @aayushsin , A new release has been created fixing this issue in the connector. This will get rolled out in the coming weeks with the Synapse release schedule
Hello @Krumelur , Unfortunately not. There has been an issue with this rollout unfortunately. Give us time till next week for this , will get a rollout done for this. In the meanwhile, the workaround is the following
@ag-ramachandran could you provide an update on the fix deployment? We're also looking for upgrading our Spark pool to 3.4 but currently blocked by the Kusto connector timeout failure.
Is there a different workaround for linked services using managed identities? Hitting this error when calling getConnectionStringOrCreds()
Py4JJavaError: An error occurred while calling z:mssparkutils.credentials.getConnectionStringOrCreds. :
com.microsoft.azure.synapse.tokenlibrary.TokenServiceClientResponseStatusException: Token Service returned 'Client Error' (400), with message:
{"result":"DependencyError","errorId":"BadRequest","errorMessage":"[Code=CredentialTypeNotSupported, Target=kusto_sips_insights, Message=Failed to load LinkedService, Exception: Credential: default is of type UAMI and is not supported]. TraceId : 5de4fe7d-b528-48bf-8828-a147a8c83d18 | client-request-id : 3f08a5cd-4427-4ce0-a25d-333001584c44. Error Component : LSR"}
at com.microsoft.azure.synapse.tokenlibrary.TokenServiceClient.invokePostApi(TokenServiceClient.scala:139)
at com.microsoft.azure.synapse.tokenlibrary.TokenServiceClient.callLinkedServiceApi(TokenServiceClient.scala:168)
at com.microsoft.azure.synapse.tokenlibrary.TokenLibraryInternal.tokenServiceCall$1(TokenLibrary.scala:112)
@jburb , the connector has been rolled out in the latest synapse version.
However, the details of how this makes it to an existing pool is probably pertinent to a Synapse pool and we may have to perhaps ask the Synapse team
If your pools have not been restarted, could you try a restart (or) Could you create a new pool to try and see if that works [you'd still be able to use all artifacts from your workspace and parts like MI even if a new pool is created]
Describe the bug Recently with Spark 3.4, we are getting the following error: Fail to write to Kusto (Azure Data Explore), on Spark Stream. Throwing the following exception. It was working with Spark 3.3 but after upgrde to 3.4 and other library upgrades, it is giving the following error
java.lang.NoClassDefFoundError: com/twitter/util/TimeoutException
To Reproduce df.write \ .format("com.microsoft.kusto.spark.synapse.datasource") \ .option("spark.synapse.linkedService",) \
.option("kustoDatabase", ) \
.option("kustoTable", ) \
.option("tableCreateOptions","CreateIfNotExist") \
.mode("Append") \
.save()
ag-ramachandran
commented
7 months ago
aayushsin
commented
7 months ago
ag-ramachandran
commented
7 months ago
ag-ramachandran
commented
7 months ago
ag-ramachandran
commented
7 months ago
ag-ramachandran
commented
7 months ago
Krumelur
commented
6 months ago
ag-ramachandran
commented
6 months ago
Krumelur
commented
6 months ago
xuewang
commented
5 months ago
mmaitre314
commented
5 months ago
ag-ramachandran
commented
5 months ago
mmaitre314
commented
5 months ago
ag-ramachandran
commented
4 months ago
ag-ramachandran
commented
4 months ago
jburb
commented
4 months ago
ag-ramachandran
commented
4 months ago
- © Githubissues.
- Githubissues is a development platform for aggregating issues.
Expected behavior The data should be written to database
Error
pip list
Additional context Add any other context about the problem here.
@aayushsin When you "upgraded" to spark 3.4. Did you save the linked service and publish your workspace ? The connector revisions have not changed between versions in Synapse, so it is most likely an issue in the linked service not being published.
I was using an existing Synapse Linked Service. The issue is not coming up with Spark 3.3 but only with spark 3.4 I am guessing since Spark 3.4 is in preview state in Synpase. This issue is there. @ag-ramachandran Can you please check the integration with Spark 3.4 with an existsing Linked service.
While I try to replicate @aayushsin , please delete that linked service, create one again, publish the workspace and run the notebook again
@aayushsin , I can replicate. While i get a fix for the preview, you can use the following
Let me know once you try the above @aayushsin , Will try and catch the 3.4 GA release in synapse with this
Hi @aayushsin , A new release has been created fixing this issue in the connector. This will get rolled out in the coming weeks with the Synapse release schedule
I'm still getting the error. Has the fix already been rolled out?
Hello @Krumelur , Unfortunately not. There has been an issue with this rollout unfortunately. Give us time till next week for this , will get a rollout done for this. In the meanwhile, the workaround is the following
Thanks. For others who might encounter the issue, here's a Python version of the workaround for reading Kusto data into a dataframe:
@ag-ramachandran could you provide an update on the fix deployment? We're also looking for upgrading our Spark pool to 3.4 but currently blocked by the Kusto connector timeout failure.
Is there a different workaround for linked services using managed identities? Hitting this error when calling
getConnectionStringOrCreds()
Hi @mmaitre314 ,
Is it UserManagedIdentity or SystemManagedIdentity.
If it is UMI, it is not supported on Synapse platform (not a connector issue).
P.S. The fix for this is already made, waiting for the rollout of this to happen
I used an alternative workaround to get the access token. In case that helps others:
This is fixed now in 3.4 and will work with linked services as-is
Writes :
Reads in single mode :
Reads in distributed mode:
Creating a new spark pool should upgrade to the latest lib and work
@ag-ramachandran, what is the path to get this fix if we already upgraded pools to 3.4 and implemented the earlier suggested workaround?
@jburb , the connector has been rolled out in the latest synapse version.
However, the details of how this makes it to an existing pool is probably pertinent to a Synapse pool and we may have to perhaps ask the Synapse team
If your pools have not been restarted, could you try a restart (or) Could you create a new pool to try and see if that works [you'd still be able to use all artifacts from your workspace and parts like MI even if a new pool is created]