Stratio / Spark-MongoDB

Spark library for easy MongoDB access
http://www.stratio.com
Apache License 2.0
307 stars 99 forks source link

How to save Date type field with 3.x driver #71

Closed moonlightheng closed 8 years ago

moonlightheng commented 8 years ago

when I save java.util.Date to mongoDB, it says spark sqlSchema for type java.util.Date is not supported when I change to java.sql.Date it says org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class java.sql.Date..

I have tried to ask on stackoverflow but nobody answered, So I have to ask for help here here's my code

case class AppDailyStatistic(date: Date, newUser: Long, launchCount: Long) val dailyData = new AppDailyStatistic(new Date(DateUtils.getStartOfToday.getTime), newUserCount, launchCount) val dataFrame: DataFrame = sqlContext.createDataFrame(sc.parallelize(ListAppDailyStatistic)); val saveConfig = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "test", Collection -> "app_daily_statistic", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitSize -> 8, SplitKey -> "_id")) dataFrame.saveToMongodb(saveConfig.build)

pmadrigal commented 8 years ago

Hi @moonlightheng ,

I don't know the value of "ListAppDailyStatistic" Do you mean "List(dailyData)"?

Try to create date in other ways. We are inserted dates and timestamps several times and it works.

Here there is and example similar to your code that works for me:

  case class AppDailyStatistic(date: java.sql.Date, newUser: Long, launchCount: Long)
  val dailyData = AppDailyStatistic(new java.sql.Date(1), 1l, 2l)
  val dataFrame: DataFrame = sqlContext.createDataFrame(sc.parallelize(List(dailyData)))
  val saveConfig = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "test",
    Collection -> "app_daily_statistic", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal,
    SplitSize -> 8, SplitKey -> "_id"))
  dataFrame.saveToMongodb(saveConfig.build)

Please close the issue if your problem is solved or give us more details.

moonlightheng commented 8 years ago

thanks for your reply the value of AppDailyStatistic is val dailyData = new AppDailyStatistic(new java.sql.Date(DateUtils.getStartOfToday.getTime), newUserCount, launchCount) here is the method used public static java.util.Date getStartOfToday() { DateTime dateTime = new org.joda.time.DateTime(); return dateTime.toDateMidnight().toDate(); }

the error stack is:

2016-01-04 13:40:21 ERROR Executor:96 - Exception in task 3.0 in stage 8.0 (TID 11) org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class java.sql.Date. at org.bson.codecs.configuration.CodecCache.getOrThrow(CodecCache.java:46) at org.bson.codecs.configuration.ProvidersCodecRegistry.get(ProvidersCodecRegistry.java:63) at org.bson.codecs.configuration.ProvidersCodecRegistry.get(ProvidersCodecRegistry.java:37) at com.mongodb.DBObjectCodec.writeValue(DBObjectCodec.java:210) at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:128) at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:61) at com.mongodb.CompoundDBObjectCodec.encode(CompoundDBObjectCodec.java:48) at com.mongodb.CompoundDBObjectCodec.encode(CompoundDBObjectCodec.java:27) at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63) at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29) at com.mongodb.connection.RequestMessage.addDocument(RequestMessage.java:216) at com.mongodb.connection.RequestMessage.addCollectibleDocument(RequestMessage.java:182) at com.mongodb.connection.InsertMessage.encodeMessageBody(InsertMessage.java:63) at com.mongodb.connection.RequestMessage.encode(RequestMessage.java:132) at com.mongodb.connection.WriteProtocol.sendMessage(WriteProtocol.java:107) at com.mongodb.connection.WriteProtocol.execute(WriteProtocol.java:59) at com.mongodb.connection.InsertProtocol.execute(InsertProtocol.java:66) at com.mongodb.connection.InsertProtocol.execute(InsertProtocol.java:37) at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:155) at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:219) at com.mongodb.connection.DefaultServerConnection.insert(DefaultServerConnection.java:72) at com.mongodb.operation.MixedBulkWriteOperation$Run$2.executeWriteProtocol(MixedBulkWriteOperation.java:411) at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:613) at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:363) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:148) at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:141) at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:186) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:177) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:141) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:72) at com.mongodb.Mongo.execute(Mongo.java:745) at com.mongodb.Mongo$2.execute(Mongo.java:728) at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1968) at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:113) at com.mongodb.casbah.BulkWriteOperation$$anonfun$2.apply(BulkWriteOperation.scala:91) at com.mongodb.casbah.BulkWriteOperation$$anonfun$2.apply(BulkWriteOperation.scala:91) at scala.util.Try$.apply(Try.scala:161) at com.mongodb.casbah.BulkWriteOperation.execute(BulkWriteOperation.scala:91) at com.stratio.datasource.mongodb.writer.MongodbBatchWriter$$anonfun$save$1.apply(MongodbBatchWriter.scala:44) at com.stratio.datasource.mongodb.writer.MongodbBatchWriter$$anonfun$save$1.apply(MongodbBatchWriter.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at com.stratio.datasource.mongodb.writer.MongodbBatchWriter.save(MongodbBatchWriter.scala:37) at com.stratio.datasource.mongodb.writer.MongodbWriter.saveWithPk(MongodbWriter.scala:82) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:60) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:56) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-01-04 13:40:21 WARN ThrowableSerializationWrapper:166 - Task exception could not be deserialized java.lang.ClassNotFoundException: org.bson.codecs.configuration.CodecConfigurationException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)

I don‘t know why I get this

pmadrigal commented 8 years ago

Hi @moonlightheng ,

We have to use java.sql.Date, because it is the supported type for sparkSQL DateType.

If I create the date value like you, I have no problems, here is the code:

  val utilDate = new org.joda.time.DateTime().toDateMidnight.toDate
  val longTime: Long = utilDate.getTime
  val sqlDate = new java.sql.Date(longTime)

  case class AppDailyStatistic(date: java.sql.Date, newUser: Long, launchCount: Long)
  val dailyData = AppDailyStatistic(sqlDate, 1l, 2l)
  val dataFrame: DataFrame = sqlContext.createDataFrame(sc.parallelize(List(dailyData)))
  val saveConfig = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "test",
    Collection -> "app_daily_statistic", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal,
    SplitSize -> 8, SplitKey -> "_id"))
  dataFrame.saveToMongodb(saveConfig.build)
moonlightheng commented 8 years ago

more details scala version 2.10.4

com.stratio.datasource spark-mongodb_2.10 0.10.1
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-core_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-commons_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-query_2.10</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.5.1</version>
    </dependency>

since Mongo 3.x library, seems they removed the java.sql.Date support. http://stackoverflow.com/questions/29940839/error-is-saving-java-sql-date-in-mongodb http://stackoverflow.com/questions/33800972/how-to-save-java-sql-date-object-in-mongodb

pmadrigal commented 8 years ago

Hi @moonlightheng ,

You are rigth, 3.x driver doesn't support java.sql.Date and spark-mongodb needs this type because DataFrame need it. So, if you have MongoDB 3.0 or lowest you can use casbah 2.8 and java driver 2.13 and java.sql.Date will work.

Anyway, we aren't adapted to 3.x drivers, this will be soon and could fix the problem.

Thanks for your feedback!

moonlightheng commented 8 years ago

I used long type to save timestamp instead of date type wait for you to support 3.x drivers thanks for your project