Azure / spark-cdm-connector

MIT License
75 stars 32 forks source link

Intermittent Concurrent exception with explcit write in spark CDM latest Jar #61

Closed debanjantcs closed 3 years ago

debanjantcs commented 3 years ago

Hi Team

I am getting intermittent issue with explicit write for spark CDM with attached log. This is not the only exception, rather I am getting different exception at times. But after some time the same piece of code works. I will add some more captured log as well.

df_new.write.format("com.microsoft.cdm")\ .option("storage", storageAccountName)\ .option("manifestPath", stageContainer + "/ADRMData/Product/root1.manifest.cdm.json")\ .option("entity", entity_name)\ .option("entityDefinitionPath","/"+entity_name+".cdm.json/"+entity_name)\ .option("entityDefinitionModelRoot", "models/CDM"+"/Product")\ .option("dataFolderFormat", "'Data'")\ .option("format", "parquet")\ .option("appId", clientID)\ .option("appKey", clientIDSecret)\ .option("tenantId", tenantID)\ .mode("overwrite")\ .save()

Attaching the log: ConcurrentExecutionError_Log.txt

Please suggest if there is any best practice to follow.

srichetar commented 3 years ago

Hi Debanjan,

Do you see this exception when you reference a particular file in entityDefinitionPath ?

srichetar commented 3 years ago

Is it possible to share this dataset ("entity_name+.cdm.json" + Spark schema in df_new) so we can reproduce it on our end? Can you email on asksparkcdm@microsoft.com?

debanjantcs commented 3 years ago

Hi Sricheta

I am attaching the entity json and dataset, but it is not exactly happening for any particular entity. And moreover, if we run the code again, it is resulting some different exception and finally it is actually able to write the valid parquet.

Issue_related_files.zip

srichetar commented 3 years ago

Are you trying this on Synapse or Databricks? Please share the driver logs if you ran on databricks or storageAccountName if you ran on Azure Synapse.

debanjantcs commented 3 years ago

Trying it in Databricks. Please find the driver logs below. You can see that the kind of error coming up randomly for entities. Driver_Log.txt

debanjantcs commented 3 years ago

Adding some more logs captured. Please be informed that the exceptions are not consistent and the same piece of code works without even changing anything after some time. FYI, the explicit write piece of code is kept inside a method of common notebook, which is sometime called from other notebooks written by multiple developers working in same cluster.

stderr--2020-11-25--14-00.txt stderr--2020-11-25--15-00.txt

srichetar commented 3 years ago

Thanks for providing the dataset. I had to make 2 modifications below in order to make the explicit write run.

  1. Change the datatype of UpdatedDate in CdmEntity from dateTime to datetime image the datatypes in CDM and case sensitive. (saw this error in driver log shared by you)

  2. Since the FlavorType extends CdmEntity. The 2 attributes from CDMEntity - UpdatedDate and UpdatedBy are added at the top. The ordering of attributes matters when comparing the schema from CDM and Spark in explicit write. I had to change the export_data.csv file to adhere to the ordering. Without changing that, that I got (which I saw in the driver logs you shared with me)-

    java.lang.Exception: The dataframe schema does not match the cdm schema for field "FlavorTypeId". Path : "FlavorType > FlavorTypeId"

I don't see this exception when I tried explicit write with your dataset.

: java.util.concurrent.ExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at com.microsoft.cdm.utils.CDMModelCommon.getCDMSchemaTypesAsSeqFromPredefi

Are these entity files - '/ProductStatus.cdm.json' , '/JewelryProductMaterialCategory.cdm.json' being read concurrently?

debanjantcs commented 3 years ago

Hi Sricheta

Thanks for the feedback for point 1 and 2. Will incorporate that. Please find the answer below:

Are these entity files - '/ProductStatus.cdm.json' , '/JewelryProductMaterialCategory.cdm.json' being read concurrently? Answer: The explicit write is placed in one method which is called in parallel by different notebooks. None of these entities are read concurrently, but all of them refer to single manifest which is mentioned in the explicit write method. Let me know if I answer your question.

debanjantcs commented 3 years ago

Incorporated the 2 changes suggested. Attaching the updated log: stderr--2020-11-27--12-00.txt

debanjantcs commented 3 years ago

HI Sricheta

Attaching the latest log from today, please find the exception there

stderr--2020-11-30--14-00.txt

Py4JJavaError: An error occurred while calling o1705.save. : java.util.concurrent.ExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

debanjantcs commented 3 years ago

java.util.concurrent.ExecutionException: java.lang.Exception: CdmCorpusDefinition | Unable to resolve the reference: 'datetime' to a known object, folderPath: '/', path: 'CdmEntity/hasAttributes/UpdatedDate/dataType/datetime/(ref)' | /CdmEntity/hasAttributes/UpdatedDate/dataType/datetime/(ref)

debanjantcs commented 3 years ago

java.util.concurrent.ExecutionException: java.lang.Exception: PersistenceLayer | Could not read '/StoreItem/StoreItem.cdm.json' from the 'SparkManifestLocation' namespace. Reason 'com.microsoft.commondatamodel.objectmodel.storage.StorageAdapterException: Could not read ADLS content at path: /StoreItem/StoreItem.cdm.json' | loadDocumentFromPathAsync

debanjantcs commented 3 years ago

Hi Sricheta

We are getting these above different concurrent exceptions during explicit write. But the JSONs and the required files are already present and it was working earlier as expected. Please advise

debanjantcs commented 3 years ago

Py4JJavaError: An error occurred while calling o1702.save. : java.util.concurrent.ExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at com.microsoft.cdm.utils.CDMModelCommon.getCDMSchemaTypesAsSeqFromPredefinedSource(CDMModelCommon.scala:280) at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$createWriterFactory$1.apply$mcV$sp(CDMDataSourceWriter.scala:158) at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$createWriterFactory$1.apply(CDMDataSourceWriter.scala:150) at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$createWriterFactory$1.apply(CDMDataSourceWriter.scala:150) at com.microsoft.cdm.log.SparkCDMLogger$.logEventToKustoForPerf(SparkCDMLogger.scala:43) at com.microsoft.cdm.write.CDMDataSourceWriter.createWriterFactory(CDMDataSourceWriter.scala:148) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:55) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:191) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:117) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1$$anonfun$apply$1.apply(SQLExecution.scala:112) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:98) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:74) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:184) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:711) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:282) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2011) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3310) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) at com.microsoft.commondatamodel.objectmodel.cdm.CdmFolderDefinition.lambda$fetchDocumentFromFolderPathAsync$0(CdmFolderDefinition.java:171) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

bissont commented 3 years ago

Are these entity files - '/ProductStatus.cdm.json' , '/JewelryProductMaterialCategory.cdm.json' being read concurrently? Answer: The explicit write is placed in one method which is called in parallel by different notebooks. None of these entities are read concurrently, but all of them refer to single manifest which is mentioned in the explicit write method. Let me know if I answer your question.

Thanks.

This is currently not supported by the spark-cdm-connector. You'll need to ensure that only one write can occur at a time. If you are writing in parallel, the results are non-deterministic.

bissont commented 3 years ago

We are getting these above different concurrent exceptions during explicit write. But the JSONs and the required files are already present and it was working earlier as expected. Please advise

So this is an append/over-write, correct? In that case, the root manifest file is updated. If there are other writes under the same manifest, they too will cause an overwrite of the same root manifest.

debanjantcs commented 3 years ago

Are these entity files - '/ProductStatus.cdm.json' , '/JewelryProductMaterialCategory.cdm.json' being read concurrently? Answer: The explicit write is placed in one method which is called in parallel by different notebooks. None of these entities are read concurrently, but all of them refer to single manifest which is mentioned in the explicit write method. Let me know if I answer your question.

Thanks.

This is currently not supported by the spark-cdm-connector. You'll need to ensure that only one write can occur at a time. If you are writing in parallel, the results are non-deterministic.

It is overwrite. So you mean if same manifest is referred by different notebook at the same time, it is not supported? And moreover, it is happening for any single file as well. How can those threads be killed? Restarting cluster is also not solving the problem.

bissont commented 3 years ago

The only concurrent access the Spark-CDM-Connector offers is for parallel reads, because it doesn't have any locking. If the manifest is referred multiple times and two operations cause an update to the same manifest, it's a race condition, with indeterministic results.

I don't know which threads you are talking about, as I haven't experienced that issue, but if it's on Databricks, you'll have to get support from them. How about re-creating the cluster?

What to you mean by it is happening for any single file? Are you saying there is zero concurrency in that situation?

debanjantcs commented 3 years ago

I don't know which threads you are talking about, as I haven't experienced that issue, but if it's on Databricks, you'll have to get support from them. How about re-creating the cluster? To answer the above, even if we restarting the cluster, and only one notebook is running, it is resulting to concurrent or thread lock exception.

What to you mean by it is happening for any single file? Are you saying there is zero concurrency in that situation? yes there is no concurrency. Only one file is running and still resulting to same issue.

debanjantcs commented 3 years ago

Team, Please advise us on this. We are getting the same issue even in zero concurrency situation also. It is a blocker for us to proceed with the write.

debanjantcs commented 3 years ago

Hi Team,

We were working on 18.2 version beta jar. It works fine for the above reported issues. Today we got one more error mentioned below and attaching the data and CDM json for this.

Py4JJavaError: An error occurred while calling o1861.save. : java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at com.microsoft.cdm.utils.CDMModelCommon.getCDMSchemaTypesAsSeqFromPredefinedSource(CDMModelCommon.scala:258) at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$createWriterFactory$1.apply$mcV$sp(CDMDataSourceWriter.scala:171) at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$createWriterFactory$1.apply(CDMDataSourceWriter.scala:152) at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$createWriterFactory$1.apply(CDMDataSourceWriter.scala:152) at com.microsoft.cdm.log.SparkCDMLogger$.logEventToKustoForPerf(SparkCDMLogger.scala:43) at com.microsoft.cdm.write.CDMDataSourceWriter.createWriterFactory(CDMDataSourceWriter.scala:150) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:55) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:191) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:117) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1$$anonfun$apply$1.apply(SQLExecution.scala:112) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:98) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:74) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:184) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:711) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:282) at sun.reflect.GeneratedMethodAccessor730.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at com.microsoft.commondatamodel.objectmodel.cdm.projections.CdmProjection.constructProjectionContext(CdmProjection.java:311) at com.microsoft.commondatamodel.objectmodel.cdm.CdmEntityAttributeDefinition.constructResolvedAttributes(CdmEntityAttributeDefinition.java:450) at com.microsoft.commondatamodel.objectmodel.cdm.CdmObjectBase.fetchResolvedAttributes(CdmObjectBase.java:456) at com.microsoft.commondatamodel.objectmodel.cdm.CdmEntityDefinition.constructResolvedAttributes(CdmEntityDefinition.java:331) at com.microsoft.commondatamodel.objectmodel.cdm.CdmObjectBase.fetchResolvedAttributes(CdmObjectBase.java:456) at com.microsoft.commondatamodel.objectmodel.cdm.CdmEntityDefinition.lambda$createResolvedEntityAsync$2(CdmEntityDefinition.java:467) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Product_Data.zip

srichetar commented 3 years ago

As discussed on a separate email thread, the connector does not support CDM projection. Closing this, as removing projection from the attributes worked fine in 0.18.2.