Azure / spark-cdm

A Spark connector for the Azure Common Data Model
MIT License
15 stars 16 forks source link

Timestamp and Int32 data type is not recognizable #11

Open Praveen-jsr opened 4 years ago

Praveen-jsr commented 4 years ago

I'm getting an exception while writing the data into CDM folder when the column data type is either timestamp or Int32. The CSV files are generated in the snapshot folder but the code fails while creating model.json file.

org.apache.spark.SparkException: Writing job aborted.

Py4JJavaError                             Traceback (most recent call last)
<command-251668059500004> in <module>
     11                    .option("tenantId", tenantID)
     12                    .option("cdmFolder", outputLocation)
---> 13                    .option("cdmModelName", "dimTest")
     14                    .save())

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    735             self.format(format)
    736         if path is None:
--> 737             self._jwrite.save()
    738         else:
    739             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o494.save.
: org.apache.spark.SparkException: Writing job aborted.
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:281)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: key not found: TimestampType
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:59)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:59)
    at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$4.apply(CDMDataSourceWriter.scala:51)
    at com.microsoft.cdm.write.CDMDataSourceWriter$$anonfun$4.apply(CDMDataSourceWriter.scala:50)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at org.apache.spark.sql.types.StructType.map(StructType.scala:99)
    at com.microsoft.cdm.write.CDMDataSourceWriter.commit(CDMDataSourceWriter.scala:50)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76)

Sharing the sample code to recreate the issue .

df_sql = spark.sql("SELECT cast('2024-12-26T00:00:00.000+0000' as timestamp) AS CalendarDate, cast(20191226 as int) AS DateKey");
df_sql.show()

tableType = "dimension"
outputLocation =  storageLocation + "/" + tableType

(df_sql.write.format("com.microsoft.cdm")
                   .option("entity", "dimTest")
                   .option("appId", appID)
                   .option("appKey", appKey)
                   .option("tenantId", tenantID)
                   .option("cdmFolder", outputLocation)
                   .option("cdmModelName", "dimTest")
                   .save())

Int32 issue can be rectified by casting into bigint but it will be great if it's handled internally by code.

billgib commented 4 years ago

Hi @Praveen-jsr, thanks for your feedback. This library is now obsolete. Please look at the work we're doing with the spark-cdm-connector at https://github.com/Azure/spark-cdm-connector. We have done work there to handle a wide range of data types and if you have any problems please report it .