apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

Not valid month error when pulling new data from Oracle DB using HoodieDeltaStreamer #9460

Open haitham-eltaweel opened 1 year ago

haitham-eltaweel commented 1 year ago

When running HoodieDeltaStreamer to pull new data inclemently from Oracle DB to AWS S3, we get the following error : ORA-01843: not a valid month

We didn't find a way to alter default date format of NLS_DATE_FORMAT to 'YYYY-MM-DD HH24:MI:SS' on the client side within the JDCB connection or Spark configurations.

To Reproduce

Run the following spark submit command (I replaced some configuration values with place holders) :

spark-submit --master yarn --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --num-executors 15 --executor-cores 5 --executor-memory 30g --driver-memory 15g --name job_name --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --queue q_name --deploy-mode cluster /home/hadoop/hudi-utilities-bundle_2.12-0.11.0-amzn-0.jar --table-type MERGE_ON_READ --target-base-path s3a://bucket-name/path --target-table target_table_name --enable-sync --enable-hive-sync --sync-tool-classes org.apache.hudi.hive.HiveSyncTool --source-class org.apache.hudi.utilities.sources.JdbcSource --source-ordering-field MODIFID_DT --op UPSERT --hoodie-conf hoodie.deltastreamer.jdbc.password=pass_value --hoodie-conf hoodie.deltastreamer.jdbc.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host_name.com)(PORT=1521))(LOAD_BALANCE=YES)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=SER_NM))) --hoodie-conf hoodie.deltastreamer.jdbc.user=user_name --hoodie-conf hoodie.deltastreamer.jdbc.table.name=schema_name.table_name --hoodie-conf hoodie.deltastreamer.jdbc.incr.pull=true --hoodie-conf hoodie.datasource.write.recordkey.field=pk_id --hoodie-conf hoodie.datasource.write.precombine.field=MODIFID_DT --hoodie-conf hoodie.datasource.write.partitionpath.field=MODIFID_DT --hoodie-conf hoodie.deltastreamer.jdbc.driver.class=oracle.jdbc.driver.OracleDriver --hoodie-conf hoodie.deltastreamer.jdbc.table.incr.column.name=MODIFID_DT --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer

Note MODIFID_DT is of type timestamp in the DB table.

Expected behavior

Data will be pulled from the source table to the destination with no date format error.

Environment Description

Amazon EMR version : emr-6.7.0, installed apps : Spark 3.2.1, Hive 3.1.3, Sqoop 1.4.7, Hadoop 3.2.1 , Hudi : 0.11

Stacktrace

Caused by: java.sql.SQLDataException: ORA-01843: not a valid month

    at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494)
    at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:446)
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1054)
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:623)
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:252)
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:612)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:226)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:59)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:747)
    at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:904)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1082)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3780)
    at oracle.jdbc.driver.T4CPreparedStatement.executeInternal(T4CPreparedStatement.java:1343)
    at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3822)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1165)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:358)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:133)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
    Caused by: Error : 1843, Position : 333, Sql = SELECT "CASUALTY_MEDICAL_PROVIDER_ID","INCIDENT_ID","PROVIDER_TYPE_CD","OTHER_PROVIDER_DESC","NUM_OF_VISITS","TREATMENT_START_DATE","TREATMENT_END_DATE","INCURRED_MEDICAL_EXPENSE","USER_ID","APPL_ID","REC_DT","MODIFID_DT","MEDICAL_EXPENSE_UNKNOWN_FLAG" FROM (select * from schema_name.table_name where  MODIFID_DT > '2023-08-01 17:55:38') rdbms_table  , OriginalSql = SELECT "CASUALTY_MEDICAL_PROVIDER_ID","INCIDENT_ID","PROVIDER_TYPE_CD","OTHER_PROVIDER_DESC","NUM_OF_VISITS","TREATMENT_START_DATE","TREATMENT_END_DATE","INCURRED_MEDICAL_EXPENSE","USER_ID","APPL_ID","REC_DT","MODIFID_DT","MEDICAL_EXPENSE_UNKNOWN_FLAG" FROM (select * from schema_name.table_name where  MODIFID_DT > '2023-08-01 17:55:38') rdbms_table  , Error Msg = ORA-01843: not a valid month
ad1happy2go commented 1 year ago

@haitham-eltaweel What date format is MODIFID_DT present in oracle? what is the datatype?

haitham-eltaweel commented 1 year ago

@haitham-eltaweel What date format is MODIFID_DT present in oracle? what is the datatype?

It is timestamp type. The values have this format 'YYYY-MM-DD HH24:MI:SS'

ad1happy2go commented 1 year ago

@haitham-eltaweel I dont think there is a way to set default date format. We may want to add additional functionality to have an additional config like hoodie.deltastreamer.jdbc.incr.predicate to handle such cases for which we can give any custom predicate if source DB doesn't support the default one. Created JIRA for the same -

https://issues.apache.org/jira/browse/HUDI-6727

Feel free to contribute in case you want.