Azure / azure-kusto-spark

Apache Spark Connector for Azure Kusto
Apache License 2.0
77 stars 34 forks source link

ThrottleExceptions when writing data to ADX/Kusto #344

Closed ravikiransharvirala closed 5 months ago

ravikiransharvirala commented 10 months ago

Describe the bug When writing data from databricks to ADX/Kusto cluster Fabric, I'm seeing ThrottleExceptions that are causing the writes to fail. These started occurring suddenly since the weekend even though no changes were made to the code, Kusto connector or table sizes.

spark env vars: zulu11-ca-amd64

spark version 12.2.x-scala2.12

Kusto Spark Connector Version: com.microsoft.azure.kusto:kusto-spark_3.0_2.12:4.0.2

Code

  df.write.format("com.microsoft.kusto.spark.datasource")\
              .option("kustoCluster", kusto_config['cluster_uri'])\
              .option("kustoDatabase", "<database>")\
              .option("KustoTable", "<table-name>")\
              .option("kustoAadAppId", kusto_config['client_id'])\
              .option("kustoAadAppSecret", kusto_config['client_secret'])\
              .option("kustoAadAuthorityID", kusto_config['tenant_id'])\
              .mode("Append")\
              .save()

Kusto Capacity Limits Ingestions: 12

I see the Consumed to 100% while running the job. This wasn't an issue before but from weekend I see below Throttle Exception

com.microsoft.azure.kusto.data.exceptions.ThrottleException: Request was throttled, too many requests.

Py4JJavaError: An error occurred while calling o3513.load.
: com.microsoft.azure.kusto.data.exceptions.ThrottleException: Request was throttled, too many requests.
    at com.microsoft.azure.kusto.data.Utils.post(Utils.java:92)
    at com.microsoft.azure.kusto.data.ClientImpl.executeToJsonResult(ClientImpl.java:156)
    at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:105)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.$anonfun$executeEngine$1(ExtendedKustoClient.scala:394)
    at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$$anon$2.apply(KustoDataSourceUtils.scala:398)
    at io.github.resilience4j.retry.Retry.lambda$decorateCheckedSupplier$3f69f149$1(Retry.java:137)
    at io.github.resilience4j.retry.Retry.executeCheckedSupplier(Retry.java:419)
    at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.retryApplyFunction(KustoDataSourceUtils.scala:401)
    at com.microsoft.kusto.spark.utils.ExtendedKustoClient.executeEngine(ExtendedKustoClient.scala:395)
    at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.getSchema(KustoDataSourceUtils.scala:175)
    at com.microsoft.kusto.spark.datasource.KustoRelation.getSchema(KustoRelation.scala:145)
    at com.microsoft.kusto.spark.datasource.KustoRelation.schema(KustoRelation.scala:43)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:491)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:378)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:334)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:334)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
    at jdk.internal.reflect.GeneratedMethodAccessor334.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.base/java.lang.Thread.run(Thread.java:829)
ag-ramachandran commented 10 months ago

Hello @ravikiransharvirala

More than a connector issue, this is an underlying issue on the capacity and not a connector issue that will require investigation. All connector things remaining same, it could quite be that there are limits hit on parallel runs (or) other workloads using capacity As this will need looking into the cluster, would be great if you can raise a support request (or) alternatively send me details of the cluster on my microsoft handle (ramacg) so that i can take it up further

cc: @asaharn

ravikiransharvirala commented 10 months ago

@ag-ramachandran Thanks for responding. I shared cluster details via email. Please let me know if you haven't received it.

ag-ramachandran commented 10 months ago

@ravikiransharvirala Not yet!

ravikiransharvirala commented 10 months ago

@ag-ramachandran interesting. I sent it to ramacg at microsoft.

I sent it again without links and images.

ag-ramachandran commented 10 months ago

@ravikiransharvirala got it now. Will check

ag-ramachandran commented 10 months ago

@ravikiransharvirala , Please share the logs from STDOUT/ERR/LOG4j so that i can look at time correlation of these errors and capacity too

ravikiransharvirala commented 10 months ago

@ag-ramachandran Sure, will do that.

ravikiransharvirala commented 10 months ago

@ag-ramachandran Sent it as text. Let me know if you haven't received it.

ag-ramachandran commented 10 months ago

@ravikiransharvirala no exceptions in that log though!

ravikiransharvirala commented 10 months ago

@ag-ramachandran may be the message is truncated. I sent you a new email with Exception stack trace

ravikiransharvirala commented 10 months ago

@ag-ramachandran Do you recommend persisting the dataframe after reading the data from the Kusto connector coz after reading the data from the database and performing transformations on it, I notice the connector making calls to the database throughout the job's execution.

These are the two queries I noticed while running the job (the job needs entire data from the table)

| count | evaluate estimate_rows_count()
ag-ramachandran commented 10 months ago

@ravikiransharvirala , Need more specifics. If are trying to read the same data again and again, it makes good reading to cache it. These queries are used to determine how data is read, the internals of reading are different in ForceSingle and ForceDistributed modes. In your case you can set the readMode as ForceDistributed and i think some of these queries would go away.

If in force distributed mode (parquet export) you want to reuse the same file use the transient cache to true.

KUSTO_READ_MODE 'readMode' - Override the connector heuristic to choose between 'Single' and 'Distributed' mode. Options are - 'ForceSingleMode', 'ForceDistributedMode'. Scala and Java users may take these options from com.microsoft.kusto.spark.datasource.ReadMode.

KUSTO_DISTRIBUTED_READ_MODE_TRANSIENT_CACHE When 'Distributed' read mode is used and this is set to 'true', the request query is exported only once and exported data is reused.

Read up more on : https://github.com/Azure/azure-kusto-spark/blob/master/docs/KustoSource.md

P.S. It may not be related to this issue, but are good options to set and try for optimized reads