arangodb / arangodb-spark-datasource

ArangoDB Connector for Apache Spark, using the Spark DataSource API
Apache License 2.0
14 stars 11 forks source link

[DE-616] ttl for cursor cannot be configured from ArangoDB Spark data source and getting cursor not found #46

Closed kundan59 closed 6 months ago

kundan59 commented 1 year ago

I am writing a data pipeline to ingest data from Arango db to Bigquery. I have used the arango spark data source.

here is the code :

 df: DataFrame = spark.read.format("com.arangodb.spark") \
        .option("query", query) \
        .options(**arango_connection) \
        .schema(doc_schema).load()
df.count()
    df.write.format('bigquery').mode("append") \
        .option('table', bq_table) \
        .option("project", bq_project) \
        .option("dataset", bq_dataset) \
        .option("writeMethod", "direct") \
        .option('credentialsFile', 'path/of/gcp-credential') \
        .save() 

this code works if my Arango collection has fewer documents, like writing 10000 documents from Arango db to Bigquery.

But my collection will have more than 10, 00, 000 documents. When testing with above 50, 000 document spark job fails.

error getting Caused by: org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (BS1ZN93 executor driver): org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254) at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Suppressed: com.arangodb.ArangoDBException: Response: 404, Error: 1600 - cursor not found at com.arangodb.internal.util.ResponseUtils.checkError(ResponseUtils.java:53) at com.arangodb.http.HttpCommunication.execute(HttpCommunication.java:86) at com.arangodb.http.HttpCommunication.execute(HttpCommunication.java:66) at com.arangodb.http.HttpProtocol.execute(HttpProtocol.java:44) at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:60) at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:48) at com.arangodb.internal.ArangoDatabaseImpl$1.close(ArangoDatabaseImpl.java:219) at com.arangodb.internal.cursor.ArangoCursorImpl.close(ArangoCursorImpl.java:69) at org.apache.spark.sql.arangodb.datasource.reader.ArangoQueryReader.close(ArangoQueryReader.scala:53) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$advanceToNextIter$1(DataSourceRDD.scala:94) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$advanceToNextIter$1$adapted(DataSourceRDD.scala:89) at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:132) at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144) at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199) ... 10 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:405) at org.apache.spark.rdd.RDD.collect(RDD.scala:1018) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354) at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3459) at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3458) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165) at org.apache.spark.sql.Dataset.count(Dataset.scala:3458) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254) at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apac

There is no option found to configure ttl which is the default 30 sec. I tried to reduce batch size but still getting cursor not found iss