Azure / azure-cosmosdb-spark

Apache Spark Connector for Azure Cosmos DB
MIT License
199 stars 119 forks source link

Can't write CosmosDB #470

Closed kamtungc closed 2 years ago

kamtungc commented 2 years ago

I got "Writing job aborted" error when upserting data into a Cosmos DB

Version: com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.11.1

Configuration: COSMOS_CFG = { "spark.cosmos.accountEndpoint" : COSMOS_ENDPOINT, "spark.cosmos.accountKey" : COSMOS_MASTERKEY, "spark.cosmos.database" : COSMOS_DATABASE, "spark.cosmos.container" : COSMOS_CONTAINER, "spark.cosmos.write.strategy": "ItemOverwrite", "spark.cosmos.write.bulk.enabled": "true", "spark.cosmos.throughputControl.name": COSMOS_CONTAINER + "DataIngestion", "spark.cosmos.throughputControl.targetThroughputThreshold": "0.95", "spark.cosmos.throughputControl.globalControl.database": COSMOS_DATABASE, "spark.cosmos.throughputControl.globalControl.container": "ThroughputControl", }

Function: def update_entities(df): print("Starting ingestion: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")) df \ .write \ .format("cosmos.oltp") \ .options(**COSMOS_CFG) \ .mode("APPEND") \ .save() print("Finished ingestion: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")) return

Error:

org.apache.spark.SparkException: Writing job aborted

Py4JJavaError Traceback (most recent call last)

in ----> 1 update_entities(df_selected) in update_entities(df) 15 def update_entities(df): 16 print("Starting ingestion: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")) ---> 17 df \ 18 .write \ 19 .format("cosmos.oltp") \ /databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options) 736 self.format(format) 737 if path is None: --> 738 self._jwrite.save() 739 else: 740 self._jwrite.save(path) /databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 1302
FabianMeiswinkel commented 2 years ago

Full callstack / error details please - without this is not actionable. The inner error will have information about what caused the abortion.

kamtungc commented 2 years ago

Py4JJavaError Traceback (most recent call last)

in ----> 1 update_entities(df_input) in update_entities(df) 15 def update_entities(df): 16 print("Starting ingestion: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")) ---> 17 df \ 18 .write \ 19 .format("cosmos.oltp") \ /databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options) 736 self.format(format) 737 if path is None: --> 738 self._jwrite.save() 739 else: 740 self._jwrite.save(path) /databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 115 def deco(*a, **kw): 116 try: --> 117 return f(*a, **kw) 118 except py4j.protocol.Py4JJavaError as e: 119 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o3320.save. : org.apache.spark.SparkException: Writing job aborted at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:727) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:395) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:339) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:241) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:318) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:317) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:241) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:130) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:172) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:319) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:146) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:113) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:269) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:130) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:485) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:485) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:461) at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:126) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:126) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:111) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:102) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:153) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:947) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:346) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258) at sun.reflect.GeneratedMethodAccessor426.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 25 in stage 651.0 failed 4 times, most recent failure: Lost task 25.3 in stage 651.0 (TID 12744) (10.139.64.4 executor 0): {"ClassName":"BulkOperationFailedException","userAgent":"azsdk-java-cosmos/4.32.0-snapshot.1 Linux/5.4.0-1080-azure JRE/1.8.0_302","statusCode":409,"resourceAddress":null,"innerErrorMessage":"All retries exhausted for 'UPSERT' bulk operation - statusCode=[409:0] itemId=[e75841c9-534b-423f-b397-afbc2ad7d427], partitionKeyValue=[[{}]]","causeInfo":null,"responseHeaders":"{x-ms-substatus=0}"} at com.azure.cosmos.spark.BulkWriter.handleNonSuccessfulStatusCode(BulkWriter.scala:396) at com.azure.cosmos.spark.BulkWriter.$anonfun$subscriptionDisposable$2(BulkWriter.scala:168) at com.azure.cosmos.spark.BulkWriter.$anonfun$subscriptionDisposable$2$adapted(BulkWriter.scala:144) at azure_cosmos_spark.reactor.core.scala.publisher.package$.$anonfun$scalaConsumer2JConsumer$2(package.scala:45) at azure_cosmos_spark.reactor.core.scala.publisher.package$.$anonfun$scalaConsumer2JConsumer$2$adapted(package.scala:45) at scala.Option.foreach(Option.scala:407) at azure_cosmos_spark.reactor.core.scala.publisher.package$.$anonfun$scalaConsumer2JConsumer$1(package.scala:45) at azure_cosmos_spark.reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at azure_cosmos_spark.reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) at azure_cosmos_spark.reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) at azure_cosmos_spark.reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at azure_cosmos_spark.reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2920) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2914) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2914) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1334) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3182) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3123) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3111) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1096) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2494) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2477) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:363) ... 45 more Caused by: {"ClassName":"BulkOperationFailedException","userAgent":"azsdk-java-cosmos/4.32.0-snapshot.1 Linux/5.4.0-1080-azure JRE/1.8.0_302","statusCode":409,"resourceAddress":null,"innerErrorMessage":"All retries exhausted for 'UPSERT' bulk operation - statusCode=[409:0] itemId=[e75841c9-534b-423f-b397-afbc2ad7d427], partitionKeyValue=[[{}]]","causeInfo":null,"responseHeaders":"{x-ms-substatus=0}"} at com.azure.cosmos.spark.BulkWriter.handleNonSuccessfulStatusCode(BulkWriter.scala:396) at com.azure.cosmos.spark.BulkWriter.$anonfun$subscriptionDisposable$2(BulkWriter.scala:168) at com.azure.cosmos.spark.BulkWriter.$anonfun$subscriptionDisposable$2$adapted(BulkWriter.scala:144) at azure_cosmos_spark.reactor.core.scala.publisher.package$.$anonfun$scalaConsumer2JConsumer$2(package.scala:45) at azure_cosmos_spark.reactor.core.scala.publisher.package$.$anonfun$scalaConsumer2JConsumer$2$adapted(package.scala:45) at scala.Option.foreach(Option.scala:407) at azure_cosmos_spark.reactor.core.scala.publisher.package$.$anonfun$scalaConsumer2JConsumer$1(package.scala:45) at azure_cosmos_spark.reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at azure_cosmos_spark.reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) at azure_cosmos_spark.reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) at azure_cosmos_spark.reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at azure_cosmos_spark.reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
FabianMeiswinkel commented 2 years ago

This error

{"ClassName":"BulkOperationFailedException","userAgent":"azsdk-java-cosmos/4.32.0-snapshot.1 Linux/5.4.0-1080-azure JRE/1.8.0_302","statusCode":409,"resourceAddress":null,"innerErrorMessage":"All retries exhausted for 'UPSERT' bulk operation - statusCode=[409:0] itemId=[e75841c9-534b-423f-b397-afbc2ad7d427], partitionKeyValue= for an upsert operation means that you have a unique key constraint configured on your container and at least one document you try to insert is violating the unique key constraint.

--> see this section of the Ingestion best practices for more details.

You need to fix the input data to not violate the unique key constraint before you can ingest it.