audienceproject / spark-dynamodb

Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB.
Apache License 2.0
175 stars 90 forks source link

Input type class org.apache.spark.sql.types.Decimal is not currently supported #57

Closed gfn9cho closed 3 years ago

gfn9cho commented 4 years ago

Hi,

Is Decimal type not supported. Do I have to convert it explicitly? I am getting the below error while trying to load a table having Decimal type into dynamodb, Caused by: java.lang.UnsupportedOperationException: Input type class org.apache.spark.sql.types.Decimal is not currently supported at com.amazonaws.services.dynamodbv2.document.Item.with(Item.java:1081) at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1$$anonfun$apply$7.apply(TableConnector.scala:134) at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1$$anonfun$apply$7.apply(TableConnector.scala:132) at scala.collection.immutable.List.foreach(List.scala:392) at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1.apply(TableConnector.scala:132) at com.audienceproject.spark.dynamodb.connector.TableConnector$$anonfun$putItems$1.apply(TableConnector.scala:118) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at com.audienceproject.spark.dynamodb.connector.TableConnector.putItems(TableConnector.scala:118) at com.audienceproject.spark.dynamodb.datasource.DynamoBatchWriter.flush(DynamoBatchWriter.scala:56) at com.audienceproject.spark.dynamodb.datasource.DynamoBatchWriter.commit(DynamoBatchWriter.scala:48) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

gfn9cho commented 4 years ago

I did cast the Decimal to Double and able to load into dynamo db successfully.

Akash1684 commented 4 years ago

Hi,

I have a use case where I want consume and persist Decimal into DynamoDB. Any guidance or code pointer for this?

gfn9cho commented 4 years ago

I just did cast the Decimal to Double in spark before writing into dynamodb.

jacobfi commented 4 years ago

Obviously precision can be lost when doing a cast. This should be supported natively.

Akash1684 commented 4 years ago

@jacobfi Exactly the point Jacob for users it essentially means overhead of manipulating data in the persistence layer which may or may not result into issues such as precision loss.

denizevrenci commented 4 years ago

I found a potential solution for this without losing precision:

Adding the following case in JavaConverter.convertRowValue solved it for me:

case t: DecimalType => row.getDecimal(index, t.precision, t.scale).toBigDecimal
jacobfi commented 3 years ago

Hi. Merged and published with https://github.com/audienceproject/spark-dynamodb/releases/tag/v1.1.1