Azure / spark-cdm-connector

MIT License
75 stars 32 forks source link

java.lang.ArrayIndexOutOfBoundsException when working with CDM data #60

Closed JurgenPostelmans closed 3 years ago

JurgenPostelmans commented 3 years ago

Hi,

I don't know if it is related to another issue here on the forum but I have the following scenario:

  1. Exported some data out of Dynamics 365 Customer Insights in CDM folders. -> Customer data
  2. Exported some other data out of Dynamics 365 for Sales in CDM folders. -> Opportunity data,...

I run the following code to read some customer data

` storageAccountName = "XXXX.dfs.core.windows.net" filesystem = "YYYY"

dfCustomers = (spark.read.format("com.microsoft.cdm") .option("storage", storageAccountName) .option("manifestPath", filesystem + "/CustomerInsights_e5b1e222-1349-44d6-89f6-dd29306d921c/u2udlsgen2/model.json") .option("entity", "Customer") .load()) `

The following line of code is still running fine

dfCustomers.filter(dfCustomers.FullName.isNotNull()).select("CustomerId","Fullname","State").show()

But the next line will fail

dfCustomers.filter(dfCustomers.FullName.isNotNull()).select("CustomerId","Fullname","State").groupBy("State").count().show()

And I can let if fail on lot's of other operators with the same kind of error message. Any idea what could be wrong?

Py4JJavaError : An error occurred while calling o208.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, 5827603f18c44e67b40e5e607b7a7c8b002bb512957, executor 2): java.lang.ArrayIndexOutOfBoundsException: 27 at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:61) at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:60) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:19) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2110) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2150) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:368) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3397) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2558) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2558) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3378) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3377) at org.apache.spark.sql.Dataset.head(Dataset.scala:2558) at org.apache.spark.sql.Dataset.take(Dataset.scala:2772) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:262) at org.apache.spark.sql.Dataset.showString(Dataset.scala:299) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: 27 at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:61) at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:60) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:19) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Traceback (most recent call last): File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 380, in show print(self._jdf.showString(n, 20, vertical)) File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco return f(*a, **kw) File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o208.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, 5827603f18c44e67b40e5e607b7a7c8b002bb512957, executor 2): java.lang.ArrayIndexOutOfBoundsException: 27 at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:61) at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:60) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:19) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2110) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2150) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:368) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3397) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2558) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2558) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3378) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3377) at org.apache.spark.sql.Dataset.head(Dataset.scala:2558) at org.apache.spark.sql.Dataset.take(Dataset.scala:2772) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:262) at org.apache.spark.sql.Dataset.showString(Dataset.scala:299) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: 27 at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:61) at com.microsoft.cdm.read.CDMDataReader$$anonfun$1.apply(CDMDataReader.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:60) at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:19) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

bissont commented 3 years ago

Hi,

I'm trying to understand your flow better:

If you run dfCustomers.show(), do you get the same error?

Since the show() without the groupBy("State").count() works, it doesn't seem related to the other issue (where the number of columns vary in the CSV file for an entity)

JurgenPostelmans commented 3 years ago

Hi,

Maybe I should not have mentioned the Opportunity data but I was really trying to join the 2 entities together and that is where I started to get the unhandled exception.

So the bottom line is quite simple:

  1. I exported the Customer entity from Dynamics 365 Customer Insights into a CDM folder.
  2. Doing a dfCustomers.show() just works fine. I see the first 20 records.
  3. The dfCustomers.filter(dfCustomers.FullName.isNotNull()).select("CustomerId","Fullname","State").show() also works fine.
  4. As soon as I try to do something more complex like a groupBy or Join I will always get the ArrayIndexOutOfBoundsException error.

So that is where I'm stuck at...

Jurgen

bissont commented 3 years ago

I see. dfCustomers.count() works?

I don't have a good reason why it would fail with more comlex operations as the connector doesn't have column projection. I'm now thinking that it may be because the number of columns in the dataset issue.

Is it possible to share this dataset (model.json + data) so we can reproduce on our end? Can you email asksparkcdm@microsoft.com

bissont commented 3 years ago

Closing this as we didn't receive a request on @asksparkcdm. If you still face the issue, please create a new ticket.