data-integrations / database-plugins

Database plugins
Other
14 stars 30 forks source link

MySQL batch source job fails when zeroDateTimeBehavior=CONVERT_TO_NULL is set in connection arguments. #488

Closed damjad closed 4 months ago

damjad commented 7 months ago

Context We have a MySQL table with a date column that has a not null constraint.

CREATE TABLE `test_zero_date_time_1` (
  `id` int(11) DEFAULT NULL,
  `date_col` timestamp NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

The data in the table is:

+------+---------------------+
| id   | date_col            |
+------+---------------------+
|    1 | 2020-01-01 00:00:00 |
|    2 | 2021-01-01 00:00:00 |
|    3 | 0000-00-00 00:00:00 |
+------+---------------------+

The connection arguments we use are as follows:

zeroDateTimeBehavior=CONVERT_TO_NULL

Issue

When we try to ingest data from MySQL and push to BigQuery, the ingestion fails with the following error:

io.cdap.cdap.api.data.format.UnexpectedFormatException: field date_col cannot be set to a null value.

So, the problem is that JDBC converts the zero timestamp with id=3 to be null and some schema validation marks this record as invalid.

The whole stack trace is as follows:

Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075)
    at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:833)
    at io.cdap.cdap.etl.spark.batch.RDDUtils.saveHadoopDataset(RDDUtils.java:58)
    at io.cdap.cdap.etl.spark.batch.RDDUtils.saveUsingOutputFormat(RDDUtils.java:47)
    at io.cdap.cdap.etl.spark.batch.SparkBatchSinkFactory.writeFromRDD(SparkBatchSinkFactory.java:200)
    at io.cdap.cdap.etl.spark.batch.BaseRDDCollection$1.run(BaseRDDCollection.java:238)
    at io.cdap.cdap.etl.spark.SparkPipelineRunner.executeSinkRunnables(SparkPipelineRunner.java:210)
    at io.cdap.cdap.etl.spark.SparkPipelineRunner.processDag(SparkPipelineRunner.java:202)
    at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:183)
    at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:260)
    at io.cdap.cdap.app.runtime.spark.SparkTransactional$2.run(SparkTransactional.java:236)
    at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:208)
    at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:138)
    at io.cdap.cdap.app.runtime.spark.AbstractSparkExecutionContext.execute(AbstractSparkExecutionContext.scala:231)
    at io.cdap.cdap.app.runtime.spark.SerializableSparkExecutionContext.execute(SerializableSparkExecutionContext.scala:63)
    at io.cdap.cdap.app.runtime.spark.DefaultJavaSparkExecutionContext.execute(DefaultJavaSparkExecutionContext.scala:94)
    at io.cdap.cdap.api.Transactionals.execute(Transactionals.java:63)
    at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:189)
    at io.cdap.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:88)
    at io.cdap.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (cdap-mysqlfull-xxx.europe-west1-d.c.xxxx.internal executor 2): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:162)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: io.cdap.cdap.api.data.format.UnexpectedFormatException: field date_col cannot be set to a null value.
    at io.cdap.cdap.api.data.format.StructuredRecord$Builder.validateAndGetField(StructuredRecord.java:675)
    at io.cdap.cdap.api.data.format.StructuredRecord$Builder.set(StructuredRecord.java:371)
    at io.cdap.plugin.db.DBRecord.setField(DBRecord.java:164)
    at io.cdap.plugin.db.DBRecord.handleField(DBRecord.java:139)
    at io.cdap.plugin.db.DBRecord.readFields(DBRecord.java:112)
    at org.apache.hadoop.mapreduce.lib.db.DBRecordReader.nextKeyValue(DBRecordReader.java:236)
    at io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat$1.nextKeyValue(DataDrivenETLDBInputFormat.java:142)
    at io.cdap.cdap.etl.spark.io.TrackingRecordReader.nextKeyValue(TrackingRecordReader.java:47)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:135)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
    ... 9 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2257)
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
    ... 30 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:162)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: io.cdap.cdap.api.data.format.UnexpectedFormatException: field date_col cannot be set to a null value.
    at io.cdap.cdap.api.data.format.StructuredRecord$Builder.validateAndGetField(StructuredRecord.java:675)
    at io.cdap.cdap.api.data.format.StructuredRecord$Builder.set(StructuredRecord.java:371)
    at io.cdap.plugin.db.DBRecord.setField(DBRecord.java:164)
    at io.cdap.plugin.db.DBRecord.handleField(DBRecord.java:139)
    at io.cdap.plugin.db.DBRecord.readFields(DBRecord.java:112)
    at org.apache.hadoop.mapreduce.lib.db.DBRecordReader.nextKeyValue(DBRecordReader.java:236)
    at io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat$1.nextKeyValue(DataDrivenETLDBInputFormat.java:142)
    at io.cdap.cdap.etl.spark.io.TrackingRecordReader.nextKeyValue(TrackingRecordReader.java:47)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:135)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
    ... 9 more
damjad commented 7 months ago

@itsankit-google please assign it to me. I have found the issue. Preliminary results of my change are working fine. I'll do a proper fix soon.

damjad commented 7 months ago

Please have a look https://github.com/data-integrations/database-plugins/pull/489

damjad commented 4 months ago

Thank you so much for merging it. Do you know when it'll be available in DF? I have created a fix version over an old one. So no rush, just curious.

itsankit-google commented 4 months ago

Thank you so much for merging it. Do you know when it'll be available in DF? I have created a fix version over an old one. So no rush, just curious.

With the current setup, it will go in next minor release which will be somewhere in Q3-Q4 this year. If it is urgent, we can release MySQL plugin separately in hub with the fix for latest released DF version.

damjad commented 4 months ago

Thanks. I have built the plugin locally and uploaded the JAR for now. It's not that urgent. So, closing the issue.

Thanks for all the help!