arangodb / arangodb-spark-connector

Apache License 2.0
33 stars 15 forks source link

Provide custom serializer with `WriteOptions` and `ReadOptions` #29

Closed reynoldsm88 closed 2 years ago

reynoldsm88 commented 2 years ago

I would like to be able to configure my own Jackson serializer and provide that to ArangoSpark for reading and writing records. This is related to my previous issue #28.

I've made speculative changes in my fork, however I run into a problem when using this in a real scenario.

When I run an actual spark job with these changes I get an error about the task not being serializable due to ArangoJack not being serializable. I tried to follow the established pattern in the code for providing the serializer but it seems like this is not going to work. What would be the best way to implement this feature?

This is the error I get when running my job with a SNAPSHOT version of the library based on my fork:

[error]         at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
[error]         at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
[error]         at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[error]         at org.apache.spark.SparkContext.clean(SparkContext.scala:2477)
[error]         at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1019)
[error]         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[error]         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[error]         at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
[error]         at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1018)
[error]         at com.arangodb.spark.ArangoSpark$.saveRDD(ArangoSpark.scala:121)
[error]         at com.arangodb.spark.ArangoSpark$.save(ArangoSpark.scala:59)
[error]         at Main$.main(Main.scala:37)
[error]         at Main.main(Main.scala)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]         at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.io.NotSerializableException: com.arangodb.mapping.ArangoJack
rashtao commented 2 years ago

Please note that this project will be soon deprecated in favor of ArangoDB Spark Datasource, which implements Spark Datasource V2 API and overcomes some design limitations of this connector.

About your questions, we could release a new version of this connector containing the latest additions, i.e. using ArangoJack. Allowing registering Jackson modules should also be possible. I would recommend you keeping the same approach that we had in the original code, so always using ArangoJack, and allowing providing additional modules.

About your fork, I think it does not work because ArangoJack needs to be instantiated at executor side and not in the driver. As you can see here: https://github.com/arangodb/arangodb-spark-connector/blob/58ddf7c16e3b3a0afa2693c1304fc7e48dcb7b90/src/main/scala/com/arangodb/spark/ArangoSpark.scala#L123, so within the foreachPartition block.

Note that custom Jackson modules provided in the configuration must then be serializable.

reynoldsm88 commented 2 years ago

@rashtao thanks for your reply! I was not aware of the new datasource connector. if we are building a greenfield application on top of spark 3 would you recommend to just start from there instead of spending time getting this to work?

rashtao commented 2 years ago

Yes, I would recommend starting directly with ArangoDB Spark Datasource. However there the serialization mappings are applied at a lower level (Spark SQL InternalRow) and cannot read the Jackson annotations in your classes. So you should manually implement the mapping conversions, i.e. using Dataframe or Dataset API.

reynoldsm88 commented 2 years ago

@rashtao okay so i think i am following you in that, if we move to the datasource and use the DataFrame / Dataset API then I don't need to worry about jackson serializers because the rows will be mapped to the names of the columns?

ie: if i insert a dataframe with the column extracted_text then it will appear like that in the arango document. if i were to load that exact same document, then the column extracted_text would be present in that format?

reynoldsm88 commented 2 years ago

@rashtao sorry to reply bomb. but also, i think the serialization issue in my PR is because i added the serialization option to WriteOptions and ReadOptions respectively. those classes are evaluated on the master node before it is pushed to the worker. i thought i could fix it by making them lazy but that causes another type of error.

rashtao commented 2 years ago

Yes, rows will be mapped to the names of the dataframe columns. If you need any serialization mapping customization for the field values, i.e. a custom date format, as of now you need to implement it with a dataframe map transformation. I guess the exception is still related to serialization? As you can see in the master branch, instantiating ArangoJack from within the foreachPartition block (so on executor side) works fine.

reynoldsm88 commented 2 years ago

Ok thanks for clearing that up.

Let me check into what you mentioned maybe it is an error on my side.

reynoldsm88 commented 2 years ago

@rashtao i'm going to close this issue, we're going to move forward with the arangodb-spark-datasource instead. it doesn't seem worth the effort to introduce this fix into a repository that is soon to be deprecated. thoughts?

rashtao commented 2 years ago

I agree with you, using arangodb-spark-datasource would be a better choice. To be fair with users still using this library I will soon cut a new release including the latest commits from the master branch. But after that, this library will be deprecated.

reynoldsm88 commented 2 years ago

Awesome thanks @rashtao i'll close this issue