Azure / azure-kusto-spark

Apache Spark Connector for Azure Kusto
Apache License 2.0
77 stars 34 forks source link

Stuck at connecting to Kusto #353

Closed 321cyb closed 5 months ago

321cyb commented 8 months ago

Describe the bug We have a Spark batch job running in Azure Synapse, and we found that it's stuck for several hours. I head to the Spark UI, and see that no active job/stage/task is running, and below is the call stack of driver:

java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:464) sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:237) sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1401) sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1309) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:440) org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:396) org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355) org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359) org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381) org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237) org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111) org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) com.microsoft.azure.kusto.data.auth.CloudInfo.retrieveCloudInfoForCluster(CloudInfo.java:95) => holding Monitor(java.util.HashMap@591852758}) com.microsoft.azure.kusto.data.auth.CloudInfo.retrieveCloudInfoForCluster(CloudInfo.java:70) com.microsoft.azure.kusto.data.ClientImpl.validateEndpoint(ClientImpl.java:163) com.microsoft.azure.kusto.data.ClientImpl.executeToJsonResult(ClientImpl.java:152) com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:106) com.microsoft.kusto.spark.utils.ExtendedKustoClient.$anonfun$executeEngine$1(ExtendedKustoClient.scala:387) com.microsoft.kusto.spark.utils.ExtendedKustoClient$$Lambda$6210/1022461766.apply(Unknown Source) com.microsoft.kusto.spark.utils.KustoDataSourceUtils$$anon$2.apply(KustoDataSourceUtils.scala:399) io.github.resilience4j.retry.Retry.lambda$decorateCheckedSupplier$3f69f149$1(Retry.java:137) io.github.resilience4j.retry.Retry$$Lambda$6213/215217709.apply(Unknown Source) io.github.resilience4j.retry.Retry.executeCheckedSupplier(Retry.java:419) com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.retryApplyFunction(KustoDataSourceUtils.scala:402) com.microsoft.kusto.spark.utils.ExtendedKustoClient.executeEngine(ExtendedKustoClient.scala:388) com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:66) com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:49) com.microsoft.kusto.spark.synapse.datasource.BaseDefaultSource.super$createRelation(DefaultSource.scala:23) com.microsoft.kusto.spark.synapse.datasource.BaseDefaultSource.$anonfun$createRelation$1(DefaultSource.scala:23) com.microsoft.kusto.spark.synapse.datasource.BaseDefaultSource$$Lambda$6156/200749467.apply(Unknown Source) com.microsoft.kusto.spark.synapse.utils.ComponentEventPublisherEx$.$anonfun$publishComponentEventFor$1(AppEventPublisher.scala:66) com.microsoft.kusto.spark.synapse.utils.ComponentEventPublisherEx$$$Lambda$6161/62244620.apply(Unknown Source) scala.util.Try$.apply(Try.scala:213) com.microsoft.spark.utils.CommonUtils$.executeFunction(CommonUtils.scala:55) com.microsoft.spark.utils.CommonUtils$.getBlockTimeAndResult(CommonUtils.scala:36) com.microsoft.kusto.spark.synapse.utils.ComponentEventPublisherEx$.publishComponentEventFor(AppEventPublisher.scala:65) com.microsoft.kusto.spark.synapse.utils.EventPublisher.publishComponentEventFor(AppEventPublisher.scala:30) com.microsoft.kusto.spark.synapse.datasource.BaseDefaultSource.createRelation(DefaultSource.scala:20) org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) => holding Monitor(org.apache.spark.sql.execution.command.ExecutedCommandExec@1368534948}) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:152) org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$6149/1427545502.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) org.apache.spark.sql.execution.SQLExecution$$$Lambda$3530/564426568.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) org.apache.spark.sql.execution.SQLExecution$$$Lambda$3524/1639901858.apply(Unknown Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:152) org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:145) org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3009/1852522464.apply(Unknown Source) org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:145) org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:129) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1563725729}) org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:123) org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:183) org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901) org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415) org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:382) org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249) com.microsoft.analytics.common.pipeline.sink.KustoLinkServiceSink.WriteData(KustoLinkServiceSink.scala:43) com.microsoft.analytics.hostpool.app.batch.ConnectionByActivityId$.WriteData(ConnectionByActivityId.scala:98) com.microsoft.analytics.hostpool.app.HostPoolMultiApplication.run(HostPoolMultiApplication.scala:13) com.microsoft.analytics.hostpool.app.batch.ConnectionByActivityId$._main(ConnectionByActivityId.scala:40) com.microsoft.analytics.hostpool.app.SparkBaseApplication.main(SparkBaseApplication.scala:34) com.microsoft.analytics.hostpool.app.batch.ConnectionByActivityId.main(ConnectionByActivityId.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:739)

To Reproduce NA, I only encountered this issue once.

Expected behavior I expect there's a sensible timeout value for executeCollect so that it can retry after timeout.

Screenshots NA

Desktop (please complete the following information):

Additional context Add any other context about the problem here.

ag-ramachandran commented 8 months ago

This could be because of intermittent network issues that happened from Synapse. Does this manifest often ? Was it a one off ?