treeverse / lakeFS

lakeFS - Data version control for your data lake | Git for data
https://docs.lakefs.io
Apache License 2.0
4.42k stars 351 forks source link

[Bug]: Clients don't support rate limiting #5110

Open ortz opened 1 year ago

ortz commented 1 year ago

What happened?

Current Behavior: I've launched a lakeFS instance with rate limiting configured in order to avoid throttling from the backend engine (e.g DynamoDB) and put a high water mark for the infrastructure's throughput. The task fails due to ApiException: Too Many Requests produced by the client, even if the task retries, due to its nature of retrying the same thing, it'll fail again, and eventually fail the job.

Example Jupyter notebook I used to test it:

val AmazonReviews = spark.read.parquet("s3a://amazon-reviews-pds/parquet/")
val partial = AmazonReviews.limit(117647)
val reviews = spark.read.parquet("s3://MY-REPO/stress-test/abc/b")
reviews.write.parquet("lakefs://test2/main/amazon-reviews")
reviews.write.parquet("lakefs://stress-dynamo/main/amazon-reviews")
val reviews = spark.read.parquet("lakefs://stress-dynamo/test3/amazon-reviews")

reviews.repartition(4000).write.mode(SaveMode.Overwrite).parquet("lakefs://stress-dynamo/test3/amazon-reviews-repartition-4000/")

Expected Behavior

Expected Behavior: I would expect lakeFS clients to handle HTTP Status Code 429 (and possibly the additional response headers, such as Retry-After) and buffer/hold requests instead of failing.

lakeFS Version

0.90

Deplyoment

Kubernetes, Helm Chart, DynamoDB

Affected Clients

No response

Relevant logs output

Job aborted.
Caused by: Job aborted due to stage failure.
Caused by: IOException: statObject
Caused by: ApiException: Too Many Requests
    at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:659)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:365)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:202)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:202)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:202)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:198)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:591)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:178)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:591)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:567)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:198)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:198)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:183)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:174)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:228)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:898)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:1)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:45)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:47)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw.<init>(command-3805131591057868:49)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw.<init>(command-3805131591057868:51)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw.<init>(command-3805131591057868:53)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read.<init>(command-3805131591057868:55)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<init>(command-3805131591057868:59)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<clinit>(command-3805131591057868)
    at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print$lzycompute(<notebook>:7)
    at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print(<notebook>:6)
    at $line72eb909cf0414a729132966c8ac9cf5247.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:221)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:1145)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:1098)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$22(DriverLocal.scala:765)
    at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:41)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$20(DriverLocal.scala:750)
    at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:32)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
    at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:30)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:283)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:282)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:318)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:303)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:62)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:728)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:622)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:614)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:533)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:568)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:438)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:381)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:232)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 30.0 failed 4 times, most recent failure: Lost task 31.3 in stage 30.0 (TID 35121) (10.154.175.51 executor 10): java.io.IOException: statObject
    at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:751)
    at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
    at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Too Many Requests
    at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1031)
    at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
    at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1115)
    at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1089)
    at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:747)
    ... 32 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3288)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3222)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3216)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3216)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1415)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1415)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1415)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3497)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3438)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3426)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1169)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2715)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2698)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:330)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:202)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:202)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:202)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:198)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:591)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:178)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:591)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:567)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:198)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:198)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:183)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:174)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:228)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:898)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:1)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:45)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:47)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw.<init>(command-3805131591057868:49)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw.<init>(command-3805131591057868:51)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw.<init>(command-3805131591057868:53)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read.<init>(command-3805131591057868:55)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<init>(command-3805131591057868:59)
    at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<clinit>(command-3805131591057868)
    at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print$lzycompute(<notebook>:7)
    at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print(<notebook>:6)
    at $line72eb909cf0414a729132966c8ac9cf5247.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:221)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:1145)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:1098)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$22(DriverLocal.scala:765)
    at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:41)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$20(DriverLocal.scala:750)
    at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:32)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
    at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:30)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:283)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:282)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:318)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:303)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:62)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:728)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:622)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:614)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:533)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:568)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:438)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:381)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:232)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: statObject
    at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:751)
    at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
    at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Too Many Requests
    at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1031)
    at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
    at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1115)
    at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1089)
    at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:747)
    at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
    at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Contact Details

No response

arielshaqed commented 1 year ago

@ortz I understand why this happens for for HadoopFS, but not why it would happen for the metadata client with GC. There this should have been fixed by issue #4189 / PR #4190.

Did you observe GC getting stuck because of this?

arielshaqed commented 1 year ago

Why it happens in lakeFSFS

I observed this before we abandoned the work on LakeFSOutputCommitter. Like LakeFSFS the committer would never retry a failed call, and API calls to lakeFS can fail under load.

Proposal

The solution in LakeFSOutputCommitter was to add a retry wrapper (the same one from #4190 in fact) around calls to the lakeFS API. This will solve most of the issue: Spark has a fixed number of executors to use, and each executor writes in a single-threaded manner. So it should be enough to back off to succeed.

Here we would ideally also respect the Retry-After header on the 429 response, as a minimum retry interval. In practice such headers are optimized more towards single success than sustained throughput, so they may under-estimate the time to retry; consider adjust the value in any case.

ortz commented 1 year ago

@ortz I understand why this happens for for HadoopFS, but not why it would happen for the metadata client with GC. There this should have been fixed by issue #4189 / PR #4190.

Did you observe GC getting stuck because of this?

I haven't run GC to test it TBH, haven't got to it yet. It's something we'll need to do.

nopcoder commented 1 year ago

The rate limit above is based on extending the lakeFS OpenAPI first to retry-after (backoff) response first and than add the client support. Note that the issue above can origin from client timeout, for example by default the generated java SDK uses 10-30 second per request timeout by default. This can cause the large number of request while lakeFS uses the underlying storage SDK which perform backoff and supports the above header/mechanism we would like to provide to our clients.

github-actions[bot] commented 12 months ago

This issue is now marked as stale after 90 days of inactivity, and will be closed soon. To keep it, mark it with the "no stale" label.

arielshaqed commented 12 months ago

This is still an issue: server prevents rate-limiting on clients, which is poor behaviour.

See also #4957 , #5664 (which is blocked on fixing this), will add others as I find them.